9aman commented on code in PR #14798:
URL: https://github.com/apache/pinot/pull/14798#discussion_r1932092974
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1764,6 +1760,193 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ // Functional interface for upload attempts
+ @FunctionalInterface
+ private interface UploadAttempt {
+ void upload()
+ throws Exception;
+ }
+
+ private void uploadToDeepStoreWithFallback(URI uri, String segmentName,
String rawTableName,
+ SegmentZKMetadata segmentZKMetadata, PinotFS pinotFS)
+ throws Exception {
+
+ // Define upload methods in order of preference
+ List<UploadAttempt> uploadAttempts = Arrays.asList(
+ // Primary method
+ () -> {
+ String serverUploadRequestUrl = getUploadUrl(uri,
"uploadLLCSegmentToDeepStore");
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
+ serverUploadRequestUrl);
+ SegmentZKMetadata uploadedMetadata =
_fileUploadDownloadClient.uploadLLCToSegmentStoreWithZKMetadata(
+ serverUploadRequestUrl);
+ handleMetadataUpload(segmentName, rawTableName, segmentZKMetadata,
uploadedMetadata, pinotFS);
+ },
+ // First fallback
+ () -> {
+ String serverUploadRequestUrl = getUploadUrl(uri,
"uploadLLCSegment");
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
+ serverUploadRequestUrl);
+ TableLLCSegmentUploadResponse response =
+
_fileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl);
+ handleLLCUpload(segmentName, rawTableName, segmentZKMetadata,
response, pinotFS);
+ },
+ // Legacy fallback
+ () -> {
+ String serverUploadRequestUrl = getUploadUrl(uri, "upload");
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
+ serverUploadRequestUrl);
+
+ String tempUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+ handleBasicUpload(segmentName, rawTableName, segmentZKMetadata,
tempUrl, pinotFS);
+ }
+ );
+
+ // Try each method in sequence until one succeeds
+ Exception lastException = null;
+ for (UploadAttempt attempt : uploadAttempts) {
+ try {
+ attempt.upload();
+ return; // Success, exit the method
+ } catch (Exception e) {
+ lastException = e;
+ LOGGER.warn("Upload attempt failed for segment {}, trying next
method", segmentName, e);
+ }
+ }
+
+ // All attempts for segment upload failed
+ throw new Exception("All upload attempts failed for segment " +
segmentName, lastException);
+ }
+
+ private String getUploadUrl(URI uri, String endpoint) {
+ return String.format("%s/%s?uploadTimeoutMs=%d",
+ uri.toString(), endpoint, _deepstoreUploadRetryTimeoutMs);
+ }
+
+ private void handleMetadataUpload(String segmentName, String rawTableName,
SegmentZKMetadata currentMetadata,
+ SegmentZKMetadata uploadedMetadata,
+ PinotFS pinotFS)
+ throws Exception {
+
+ String downloadUrl = moveSegmentFile(rawTableName, segmentName,
uploadedMetadata.getDownloadUrl(), pinotFS);
+ LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, downloadUrl);
+ currentMetadata.setDownloadUrl(downloadUrl);
+
+ if (uploadedMetadata.getCrc() != currentMetadata.getCrc()) {
+ LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}",
segmentName,
+ uploadedMetadata.getCrc(), currentMetadata.getCrc());
+ updateSegmentMetadata(currentMetadata, uploadedMetadata);
+ }
+ }
+
+ private void handleLLCUpload(String segmentName, String rawTableName,
+ SegmentZKMetadata currentMetadata, TableLLCSegmentUploadResponse
response,
+ PinotFS pinotFS)
+ throws Exception {
+
+ String downloadUrl = moveSegmentFile(rawTableName, segmentName,
response.getDownloadUrl(), pinotFS);
+ LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, downloadUrl);
+ currentMetadata.setDownloadUrl(downloadUrl);
+
+ if (response.getCrc() != currentMetadata.getCrc()) {
+ LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}",
segmentName,
+ response.getCrc(), currentMetadata.getCrc());
+ currentMetadata.setCrc(response.getCrc());
+ }
+ }
+
+ private void handleBasicUpload(String segmentName, String rawTableName,
+ SegmentZKMetadata metadata, String tempDownloadUrl, PinotFS pinotFS)
+ throws Exception {
+
+ String downloadUrl = moveSegmentFile(rawTableName, segmentName,
tempDownloadUrl, pinotFS);
+ metadata.setDownloadUrl(downloadUrl);
+ }
+
+ /**
+ * Updates the segment metadata in ZooKeeper with information from the
uploaded segment.
+ *
+ * For pauseless consumption scenarios:
+ * - When segment status is COMMITTING, it indicates a previous segment
commit metadata update failed
+ * - In this case, we perform a full metadata update including time
boundaries, index details, and partition info
+ * - Finally, the segment status is marked as DONE to indicate successful
completion
+ *
+ * For regular consumption:
+ * - Only the CRC value is updated
+ *
+ * @param segmentZKMetadata Current segment metadata stored in ZooKeeper
that needs to be updated
+ * @param uploadedSegmentZKMetadata New metadata from the successfully
uploaded segment
+ */
+ private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata,
SegmentZKMetadata uploadedSegmentZKMetadata) {
+ if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
+ LOGGER.info("Updating additional metadata in ZK for segment {} as
pauseless is enabled",
+ segmentZKMetadata.getSegmentName());
+
segmentZKMetadata.setStartTime(uploadedSegmentZKMetadata.getStartTimeMs());
Review Comment:
Have update the function to copy all the simpleFields. The
SegementZkMetadata returned by the uploadLLSegmentToDeepStore now also the
previously missing fields like `creationTime` and `numReplicas`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]