9aman commented on code in PR #14798:
URL: https://github.com/apache/pinot/pull/14798#discussion_r1932092974


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1764,6 +1760,193 @@ public void uploadToDeepStoreIfMissing(TableConfig 
tableConfig, List<SegmentZKMe
     }
   }
 
+  // Functional interface for upload attempts
+  @FunctionalInterface
+  private interface UploadAttempt {
+    void upload()
+        throws Exception;
+  }
+
+  private void uploadToDeepStoreWithFallback(URI uri, String segmentName, 
String rawTableName,
+      SegmentZKMetadata segmentZKMetadata, PinotFS pinotFS)
+      throws Exception {
+
+    // Define upload methods in order of preference
+    List<UploadAttempt> uploadAttempts = Arrays.asList(
+        // Primary method
+        () -> {
+          String serverUploadRequestUrl = getUploadUrl(uri, 
"uploadLLCSegmentToDeepStore");
+          LOGGER.info("Ask server to upload LLC segment {} to deep store by 
this path: {}", segmentName,
+              serverUploadRequestUrl);
+          SegmentZKMetadata uploadedMetadata = 
_fileUploadDownloadClient.uploadLLCToSegmentStoreWithZKMetadata(
+              serverUploadRequestUrl);
+          handleMetadataUpload(segmentName, rawTableName, segmentZKMetadata, 
uploadedMetadata, pinotFS);
+        },
+        // First fallback
+        () -> {
+          String serverUploadRequestUrl = getUploadUrl(uri, 
"uploadLLCSegment");
+          LOGGER.info("Ask server to upload LLC segment {} to deep store by 
this path: {}", segmentName,
+              serverUploadRequestUrl);
+          TableLLCSegmentUploadResponse response =
+              
_fileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl);
+          handleLLCUpload(segmentName, rawTableName, segmentZKMetadata, 
response, pinotFS);
+        },
+        // Legacy fallback
+        () -> {
+          String serverUploadRequestUrl = getUploadUrl(uri, "upload");
+          LOGGER.info("Ask server to upload LLC segment {} to deep store by 
this path: {}", segmentName,
+              serverUploadRequestUrl);
+
+          String tempUrl = 
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+          handleBasicUpload(segmentName, rawTableName, segmentZKMetadata, 
tempUrl, pinotFS);
+        }
+    );
+
+    // Try each method in sequence until one succeeds
+    Exception lastException = null;
+    for (UploadAttempt attempt : uploadAttempts) {
+      try {
+        attempt.upload();
+        return; // Success, exit the method
+      } catch (Exception e) {
+        lastException = e;
+        LOGGER.warn("Upload attempt failed for segment {}, trying next 
method", segmentName, e);
+      }
+    }
+
+    // All attempts for segment upload failed
+    throw new Exception("All upload attempts failed for segment " + 
segmentName, lastException);
+  }
+
+  private String getUploadUrl(URI uri, String endpoint) {
+    return String.format("%s/%s?uploadTimeoutMs=%d",
+        uri.toString(), endpoint, _deepstoreUploadRetryTimeoutMs);
+  }
+
+  private void handleMetadataUpload(String segmentName, String rawTableName, 
SegmentZKMetadata currentMetadata,
+      SegmentZKMetadata uploadedMetadata,
+      PinotFS pinotFS)
+      throws Exception {
+
+    String downloadUrl = moveSegmentFile(rawTableName, segmentName, 
uploadedMetadata.getDownloadUrl(), pinotFS);
+    LOGGER.info("Updating segment {} download url in ZK to be {}", 
segmentName, downloadUrl);
+    currentMetadata.setDownloadUrl(downloadUrl);
+
+    if (uploadedMetadata.getCrc() != currentMetadata.getCrc()) {
+      LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}", 
segmentName,
+          uploadedMetadata.getCrc(), currentMetadata.getCrc());
+      updateSegmentMetadata(currentMetadata, uploadedMetadata);
+    }
+  }
+
+  private void handleLLCUpload(String segmentName, String rawTableName,
+      SegmentZKMetadata currentMetadata, TableLLCSegmentUploadResponse 
response,
+      PinotFS pinotFS)
+      throws Exception {
+
+    String downloadUrl = moveSegmentFile(rawTableName, segmentName, 
response.getDownloadUrl(), pinotFS);
+    LOGGER.info("Updating segment {} download url in ZK to be {}", 
segmentName, downloadUrl);
+    currentMetadata.setDownloadUrl(downloadUrl);
+
+    if (response.getCrc() != currentMetadata.getCrc()) {
+      LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}", 
segmentName,
+          response.getCrc(), currentMetadata.getCrc());
+      currentMetadata.setCrc(response.getCrc());
+    }
+  }
+
+  private void handleBasicUpload(String segmentName, String rawTableName,
+      SegmentZKMetadata metadata, String tempDownloadUrl, PinotFS pinotFS)
+      throws Exception {
+
+    String downloadUrl = moveSegmentFile(rawTableName, segmentName, 
tempDownloadUrl, pinotFS);
+    metadata.setDownloadUrl(downloadUrl);
+  }
+
+  /**
+   * Updates the segment metadata in ZooKeeper with information from the 
uploaded segment.
+   *
+   * For pauseless consumption scenarios:
+   * - When segment status is COMMITTING, it indicates a previous segment 
commit metadata update failed
+   * - In this case, we perform a full metadata update including time 
boundaries, index details, and partition info
+   * - Finally, the segment status is marked as DONE to indicate successful 
completion
+   *
+   * For regular consumption:
+   * - Only the CRC value is updated
+   *
+   * @param segmentZKMetadata Current segment metadata stored in ZooKeeper 
that needs to be updated
+   * @param uploadedSegmentZKMetadata New metadata from the successfully 
uploaded segment
+   */
+  private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, 
SegmentZKMetadata uploadedSegmentZKMetadata) {
+    if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
+      LOGGER.info("Updating additional metadata in ZK for segment {} as 
pauseless is enabled",
+          segmentZKMetadata.getSegmentName());
+      
segmentZKMetadata.setStartTime(uploadedSegmentZKMetadata.getStartTimeMs());

Review Comment:
   Have update the function to copy all the simpleFields. The 
SegementZkMetadaMetadata returned by the uploadLLSegmentToDeepStore now also 
the previously missing fields like `creationTime` and `numReplicas`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to