yupeng9 commented on a change in pull request #6778:
URL: https://github.com/apache/pinot/pull/6778#discussion_r696978474



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -797,6 +797,30 @@ public SimpleHttpResponse uploadSegment(URI uri, String 
segmentName, InputStream
     return uploadSegment(uri, segmentName, inputStream, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Used by controllers to send requests to servers:
+   * Controller periodic task uses this endpoint to ask servers to upload 
committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public String uploadToSegmentStore(String uri)
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    RequestBuilder requestBuilder = RequestBuilder.post(new 
URI(uri)).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DEFAULT_SOCKET_TIMEOUT_MS);
+    // sendRequest checks the response status code
+    SimpleHttpResponse response = sendRequest(requestBuilder.build());
+    String downloadUrl = response.getResponse();
+    if (downloadUrl.isEmpty()) {
+      throw new HttpErrorStatusException(

Review comment:
       we need to emit a metric when this request fails.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
##########
@@ -126,6 +126,12 @@
     public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
         "controller.segmentRelocator.initialDelayInSeconds";
 
+    // configs for uploading missing LLC segments copy to segment store
+    public static final String 
ENABLE_UPLOAD_MISSING_LLC_SEGMENT_TO_SEGMENT_STORE =
+        "controller.realtime.segment.uploadToSegmentStoreIfMissing";
+    public static final String 
VALIDATION_RANGE_IN_DAYS_TO_CHECK_MISSING_SEGMENT_STORE_COPY =

Review comment:
       whats this config for?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1228,4 +1301,146 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access. 
Validate segment creation time from segment name.

Review comment:
       we shall have a way to retrieve older segments, without modifying this 
config and redeploy

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -797,6 +797,30 @@ public SimpleHttpResponse uploadSegment(URI uri, String 
segmentName, InputStream
     return uploadSegment(uri, segmentName, inputStream, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Used by controllers to send requests to servers:
+   * Controller periodic task uses this endpoint to ask servers to upload 
committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public String uploadToSegmentStore(String uri)
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    RequestBuilder requestBuilder = RequestBuilder.post(new 
URI(uri)).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DEFAULT_SOCKET_TIMEOUT_MS);
+    // sendRequest checks the response status code
+    SimpleHttpResponse response = sendRequest(requestBuilder.build());
+    String downloadUrl = response.getResponse();
+    if (downloadUrl.isEmpty()) {
+      throw new HttpErrorStatusException(

Review comment:
       also shall we check the error msg and log them, if the request fails?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -137,9 +160,20 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  /**
+   * Map caching the LLC segment names that are missing deep store download 
uri in segment metadata.
+   * Controller gets the LLC segment names from this map, and asks servers to 
upload the segments to deep store.
+   * This helps to alleviate excessive ZK access when fetching LLC segment 
list.

Review comment:
       helps alleviate

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1228,4 +1301,146 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access. 
Validate segment creation time from segment name.
+          LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+          if (currentTimeMs - llcSegmentName.getCreationTimeMs() > 
_validationRangeForLLCSegmentsDeepStoreCopyMs) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);

Review comment:
       we'd better have a way to rate control the requests, not to have the 
servers overwhelmed by the uploading activities.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to