chenboat commented on a change in pull request #6778:
URL: https://github.com/apache/pinot/pull/6778#discussion_r708483222



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1297,110 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first 
tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can 
still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * 
https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment 
completion:Failure cases and handling</a>
+   *
+   * TODO: Add an on-demand way to upload LLC segment to deep store for a 
specific table.
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, 
because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    long retentionMs =
+        
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            
.toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - 
MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to deep 
store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download 
url.
+    List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName);
+    for (String segmentName : segmentNames) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the 
upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer 
time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is 
changed to be in parallel.
+      try {
+        // Only fix recently created segment. Validate segment creation time 
based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > 
_deepStoreLLCSegmentUploadRetryRangeMs) {
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // Only fix the committed llc segment without deep store copy
+        if (segmentZKMetadata.getStatus() != Status.DONE

Review comment:
       changed to == Status.IN_PROGRESS because there are other types of 
segment done status indicator recently added (e.g., UPLOADED)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to