swaminathanmanish commented on code in PR #12275:
URL: https://github.com/apache/pinot/pull/12275#discussion_r1456496686


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -180,27 +199,126 @@ public List<String> getSegmentMetadataFromServer(String 
tableNameWithType,
     return segmentsMetadata;
   }
 
+  /**
+   * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
+   * This method will pick a server that hosts the target segment and fetch 
the segment metadata result.
+   *
+   * @return segment metadata as a JSON string
+   */
+  public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String 
tableNameWithType,
+      Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
+      @Nullable List<String> segmentNames, int timeoutMs) {
+    List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
+      List<String> segmentsForServer = serverToSegments.getValue();
+      List<String> segmentsToQuery = new ArrayList<>();
+      for (String segment : segmentsForServer) {
+        if (segmentNames == null) {
+          // If segmentNames is null, query all segments
+          segmentsToQuery.add(segment);
+        } else if (segmentNames.contains(segment)) {

Review Comment:
   Consider using set instead of list for segments (when there are large number 
of segments) ? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -180,27 +199,126 @@ public List<String> getSegmentMetadataFromServer(String 
tableNameWithType,
     return segmentsMetadata;
   }
 
+  /**
+   * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
+   * This method will pick a server that hosts the target segment and fetch 
the segment metadata result.
+   *
+   * @return segment metadata as a JSON string
+   */
+  public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String 
tableNameWithType,
+      Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
+      @Nullable List<String> segmentNames, int timeoutMs) {
+    List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
+      List<String> segmentsForServer = serverToSegments.getValue();
+      List<String> segmentsToQuery = new ArrayList<>();
+      for (String segment : segmentsForServer) {
+        if (segmentNames == null) {
+          // If segmentNames is null, query all segments
+          segmentsToQuery.add(segment);
+        } else if (segmentNames.contains(segment)) {
+          segmentsToQuery.add(segment);
+        }
+      }
+      serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, 
segmentsToQuery,
+          serverToEndpoints.get(serverToSegments.getKey())));
+    }
+
+    // request the urls from the servers
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverToEndpoints);
+
+    Map<String, String> requestHeaders = Map.of("Content-Type", 
"application/json");
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, false, requestHeaders,
+            timeoutMs, null);
+
+    List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>();
+    int failedParses = 0;
+    int returnedSegmentsCount = 0;
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        String validDocIdMetadataList = streamResponse.getValue();
+        List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
+            JsonUtils.stringToObject(validDocIdMetadataList, new 
TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+            });
+        validDocIdMetadataInfos.addAll(validDocIdMetadataInfo);
+        returnedSegmentsCount++;
+      } catch (Exception e) {
+        failedParses++;
+        LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
+      }
+    }
+    if (failedParses != 0) {
+      LOGGER.error("Unable to parse server {} / {} response due to an error: 
", failedParses,
+          serverURLsAndBodies.size());
+    }
+
+    if (segmentNames != null && returnedSegmentsCount != segmentNames.size()) {
+      LOGGER.error("Unable to get validDocIdMetadata from all servers. 
Expected: {}, Actual: {}", segmentNames.size(),
+          returnedSegmentsCount);
+    }
+    LOGGER.debug("Retrieved segment metadata from servers.");

Review Comment:
   info ? Also can you record the size of segment list? Might be good track the 
time it took to fetch. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -47,6 +61,11 @@ public class ServerSegmentMetadataReader {
   private final Executor _executor;
   private final HttpClientConnectionManager _connectionManager;
 
+  public ServerSegmentMetadataReader() {
+    _executor = Executors.newFixedThreadPool(1);

Review Comment:
   Is this enough? Assuming these are quick calls to the server



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -180,27 +199,126 @@ public List<String> getSegmentMetadataFromServer(String 
tableNameWithType,
     return segmentsMetadata;
   }
 
+  /**
+   * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
+   * This method will pick a server that hosts the target segment and fetch 
the segment metadata result.
+   *
+   * @return segment metadata as a JSON string
+   */
+  public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String 
tableNameWithType,
+      Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
+      @Nullable List<String> segmentNames, int timeoutMs) {
+    List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
+      List<String> segmentsForServer = serverToSegments.getValue();
+      List<String> segmentsToQuery = new ArrayList<>();
+      for (String segment : segmentsForServer) {
+        if (segmentNames == null) {
+          // If segmentNames is null, query all segments
+          segmentsToQuery.add(segment);
+        } else if (segmentNames.contains(segment)) {
+          segmentsToQuery.add(segment);
+        }
+      }
+      serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, 
segmentsToQuery,
+          serverToEndpoints.get(serverToSegments.getKey())));
+    }
+
+    // request the urls from the servers
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverToEndpoints);
+
+    Map<String, String> requestHeaders = Map.of("Content-Type", 
"application/json");
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, false, requestHeaders,
+            timeoutMs, null);
+
+    List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>();
+    int failedParses = 0;
+    int returnedSegmentsCount = 0;
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        String validDocIdMetadataList = streamResponse.getValue();

Review Comment:
   To clarify, if there's a failure from the http call, what will be 
streamResponse ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to