Jackie-Jiang commented on code in PR #14798:
URL: https://github.com/apache/pinot/pull/14798#discussion_r1931399969
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -955,6 +963,167 @@ public TableLLCSegmentUploadResponse uploadLLCSegmentV2(
}
}
+ /**
+ * Upload a low level consumer segment to segment store and return the
segment download url, crc and
+ * other segment metadata. This endpoint is used when segment store copy is
unavailable for committed
+ * low level consumer segments.
+ * Please note that invocation of this endpoint may cause query performance
to suffer, since we tar up the segment
+ * to upload it.
+ *
+ * @see <a href="https://tinyurl.com/f63ru4sb></a>
+ * @param realtimeTableNameWithType table name with type.
+ * @param segmentName name of the segment to be uploaded
+ * @param timeoutMs timeout for the segment upload to the deep-store. If
this is negative, the default timeout
+ * would be used.
+ * @return full url where the segment is uploaded, crc, segmentName and
other segment metadata.
+ * @throws Exception if an error occurred during the segment upload.
+ */
+ @POST
+
@Path("/segments/{realtimeTableNameWithType}/{segmentName}/uploadLLCSegmentToDeepStore")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Upload a low level consumer segment to segment store
and return the segment download url,"
+ + "crc and other segment metadata",
+ notes = "Upload a low level consumer segment to segment store and return
the segment download url, crc "
+ + "and other segment metadata")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 500, message = "Internal server error", response =
ErrorInfo.class),
+ @ApiResponse(code = 404, message = "Table or segment not found",
response = ErrorInfo.class),
+ @ApiResponse(code = 400, message = "Bad request", response =
ErrorInfo.class)
+ })
+ public String uploadLLCSegmentToDeepStore(
+ @ApiParam(value = "Name of the REALTIME table", required = true)
@PathParam("realtimeTableNameWithType")
+ String realtimeTableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") String segmentName,
+ @QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs,
+ @Context HttpHeaders headers)
+ throws Exception {
+ realtimeTableNameWithType =
DatabaseUtils.translateTableName(realtimeTableNameWithType, headers);
+ LOGGER.info("Received a request to upload low level consumer segment {}
for table {}", segmentName,
+ realtimeTableNameWithType);
+
+ // Check it's realtime table
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(realtimeTableNameWithType);
+ if (TableType.REALTIME != tableType) {
+ throw new WebApplicationException(
+ String.format("Cannot upload low level consumer segment for a
non-realtime table: %s",
+ realtimeTableNameWithType), Response.Status.BAD_REQUEST);
+ }
+
+ // Check the segment is low level consumer segment
+ if (!LLCSegmentName.isLLCSegment(segmentName)) {
+ throw new WebApplicationException(String.format("Segment %s is not a low
level consumer segment", segmentName),
+ Response.Status.BAD_REQUEST);
+ }
+
+ TableDataManager tableDataManager =
+ ServerResourceUtils.checkGetTableDataManager(_serverInstance,
realtimeTableNameWithType);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager == null) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s does not exist",
realtimeTableNameWithType, segmentName),
+ Response.Status.NOT_FOUND);
+ }
+ if (!(segmentDataManager instanceof ImmutableSegmentDataManager)) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s does not exist on the disk",
realtimeTableNameWithType, segmentName),
+ Response.Status.NOT_FOUND);
+ }
+ ImmutableSegmentDataManager immutableSegmentDataManager =
(ImmutableSegmentDataManager) segmentDataManager;
+ SegmentMetadataImpl segmentMetadata =
+ (SegmentMetadataImpl)
immutableSegmentDataManager.getSegment().getSegmentMetadata();
+ SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(segmentMetadata);
Review Comment:
We should be able to reuse the methods in `ZKMetadataUtils`. You can move
`ZKMetadataUtils` into `pinot-common`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1361,23 +1394,29 @@ IdealState ensureAllPartitionsConsuming(TableConfig
tableConfig, List<StreamConf
SegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
LLCSegmentName latestLLCSegmentName = new
LLCSegmentName(latestSegmentName);
+ // This is the expected segment status after completion of first of the
3 steps of the segment commit protocol
+ // The status in step one is updated to
+ // 1. DONE for normal consumption
+ // 2. COMMITTING for pauseless consumption
+ Status statusPostSegmentMetadataUpdate =
+ PauselessConsumptionUtils.isPauselessEnabled(tableConfig) ?
Status.COMMITTING : Status.DONE;
Review Comment:
Check this upfront, instead of on each partition
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1764,6 +1760,193 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ // Functional interface for upload attempts
+ @FunctionalInterface
+ private interface UploadAttempt {
+ void upload()
+ throws Exception;
+ }
+
+ private void uploadToDeepStoreWithFallback(URI uri, String segmentName,
String rawTableName,
+ SegmentZKMetadata segmentZKMetadata, PinotFS pinotFS)
+ throws Exception {
+
+ // Define upload methods in order of preference
+ List<UploadAttempt> uploadAttempts = Arrays.asList(
+ // Primary method
+ () -> {
+ String serverUploadRequestUrl = getUploadUrl(uri,
"uploadLLCSegmentToDeepStore");
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
+ serverUploadRequestUrl);
+ SegmentZKMetadata uploadedMetadata =
_fileUploadDownloadClient.uploadLLCToSegmentStoreWithZKMetadata(
+ serverUploadRequestUrl);
+ handleMetadataUpload(segmentName, rawTableName, segmentZKMetadata,
uploadedMetadata, pinotFS);
+ },
+ // First fallback
+ () -> {
+ String serverUploadRequestUrl = getUploadUrl(uri,
"uploadLLCSegment");
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
+ serverUploadRequestUrl);
+ TableLLCSegmentUploadResponse response =
+
_fileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl);
+ handleLLCUpload(segmentName, rawTableName, segmentZKMetadata,
response, pinotFS);
+ },
+ // Legacy fallback
+ () -> {
+ String serverUploadRequestUrl = getUploadUrl(uri, "upload");
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
+ serverUploadRequestUrl);
+
+ String tempUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+ handleBasicUpload(segmentName, rawTableName, segmentZKMetadata,
tempUrl, pinotFS);
+ }
+ );
+
+ // Try each method in sequence until one succeeds
+ Exception lastException = null;
+ for (UploadAttempt attempt : uploadAttempts) {
+ try {
+ attempt.upload();
+ return; // Success, exit the method
+ } catch (Exception e) {
+ lastException = e;
+ LOGGER.warn("Upload attempt failed for segment {}, trying next
method", segmentName, e);
+ }
+ }
+
+ // All attempts for segment upload failed
+ throw new Exception("All upload attempts failed for segment " +
segmentName, lastException);
+ }
+
+ private String getUploadUrl(URI uri, String endpoint) {
+ return String.format("%s/%s?uploadTimeoutMs=%d",
+ uri.toString(), endpoint, _deepstoreUploadRetryTimeoutMs);
+ }
+
+ private void handleMetadataUpload(String segmentName, String rawTableName,
SegmentZKMetadata currentMetadata,
+ SegmentZKMetadata uploadedMetadata,
+ PinotFS pinotFS)
+ throws Exception {
+
+ String downloadUrl = moveSegmentFile(rawTableName, segmentName,
uploadedMetadata.getDownloadUrl(), pinotFS);
+ LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, downloadUrl);
+ currentMetadata.setDownloadUrl(downloadUrl);
+
+ if (uploadedMetadata.getCrc() != currentMetadata.getCrc()) {
+ LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}",
segmentName,
+ uploadedMetadata.getCrc(), currentMetadata.getCrc());
+ updateSegmentMetadata(currentMetadata, uploadedMetadata);
+ }
+ }
+
+ private void handleLLCUpload(String segmentName, String rawTableName,
+ SegmentZKMetadata currentMetadata, TableLLCSegmentUploadResponse
response,
+ PinotFS pinotFS)
+ throws Exception {
+
+ String downloadUrl = moveSegmentFile(rawTableName, segmentName,
response.getDownloadUrl(), pinotFS);
+ LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, downloadUrl);
+ currentMetadata.setDownloadUrl(downloadUrl);
+
+ if (response.getCrc() != currentMetadata.getCrc()) {
+ LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}",
segmentName,
+ response.getCrc(), currentMetadata.getCrc());
+ currentMetadata.setCrc(response.getCrc());
+ }
+ }
+
+ private void handleBasicUpload(String segmentName, String rawTableName,
+ SegmentZKMetadata metadata, String tempDownloadUrl, PinotFS pinotFS)
+ throws Exception {
+
+ String downloadUrl = moveSegmentFile(rawTableName, segmentName,
tempDownloadUrl, pinotFS);
+ metadata.setDownloadUrl(downloadUrl);
+ }
+
+ /**
+ * Updates the segment metadata in ZooKeeper with information from the
uploaded segment.
+ *
+ * For pauseless consumption scenarios:
+ * - When segment status is COMMITTING, it indicates a previous segment
commit metadata update failed
+ * - In this case, we perform a full metadata update including time
boundaries, index details, and partition info
+ * - Finally, the segment status is marked as DONE to indicate successful
completion
+ *
+ * For regular consumption:
+ * - Only the CRC value is updated
+ *
+ * @param segmentZKMetadata Current segment metadata stored in ZooKeeper
that needs to be updated
+ * @param uploadedSegmentZKMetadata New metadata from the successfully
uploaded segment
+ */
+ private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata,
SegmentZKMetadata uploadedSegmentZKMetadata) {
+ if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
+ LOGGER.info("Updating additional metadata in ZK for segment {} as
pauseless is enabled",
+ segmentZKMetadata.getSegmentName());
+
segmentZKMetadata.setStartTime(uploadedSegmentZKMetadata.getStartTimeMs());
Review Comment:
Should we simply put all simple fields from the uploaded ZK metadata into
current?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1764,6 +1760,193 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ // Functional interface for upload attempts
+ @FunctionalInterface
+ private interface UploadAttempt {
+ void upload()
+ throws Exception;
+ }
+
+ private void uploadToDeepStoreWithFallback(URI uri, String segmentName,
String rawTableName,
+ SegmentZKMetadata segmentZKMetadata, PinotFS pinotFS)
+ throws Exception {
+
+ // Define upload methods in order of preference
+ List<UploadAttempt> uploadAttempts = Arrays.asList(
+ // Primary method
Review Comment:
If the primary method failed, we should abort the committing segment fix
because fallbacks won't be able to handle that
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -543,13 +555,22 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
SegmentZKMetadata committingSegmentZKMetadata =
updateCommittingSegmentMetadata(realtimeTableName,
committingSegmentDescriptor, isStartMetadata);
+ // Used to inject failure for testing. RealtimeSegmentValidationManager
should be able to fix the
+ // segment that encounter failure at this stage of commit protocol.
+
FailureInjectionUtils.injectFailure(FailureInjectionUtils.FAULT_BEFORE_NEW_SEGMENT_METADATA_CREATION,
Review Comment:
+1 on this. We don't want to pollute a production class and introduce
overhead because of testing purpose. One way to solve this is to add callback
method between steps, and then override them with a test class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]