This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0ce68b916 Fix bug to return validDocIDsMetadata from all servers 
(#12431)
d0ce68b916 is described below

commit d0ce68b9163399f056f424bee9b7bc713fe33ee5
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Wed Feb 21 06:02:37 2024 +0530

    Fix bug to return validDocIDsMetadata from all servers (#12431)
    
    * Fix bug to return validDocIDsMetadata from all servers
    
    * Deduping validDocIdsMetadata response
---
 .../controller/util/ServerSegmentMetadataReader.java      | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index cc8ea07153..320649c0d1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -239,31 +239,36 @@ public class ServerSegmentMetadataReader {
           serverToEndpoints.get(serverToSegments.getKey())));
     }
 
+    BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
+
     // request the urls from the servers
     CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverToEndpoints);
+        new CompletionServiceHelper(_executor, _connectionManager, 
endpointsToServers);
 
     Map<String, String> requestHeaders = Map.of("Content-Type", 
"application/json");
     CompletionServiceHelper.CompletionServiceResponse serviceResponse =
         completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, false, requestHeaders,
             timeoutMs, null);
 
-    List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new ArrayList<>();
+    Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new 
HashMap<>();
     int failedParses = 0;
     int returnedServersCount = 0;
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       try {
         String validDocIdsMetadataList = streamResponse.getValue();
-        List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
+        List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList =
             JsonUtils.stringToObject(validDocIdsMetadataList, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
             });
-        validDocIdsMetadataInfos.addAll(validDocIdsMetadataInfo);
+        for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: 
validDocIdsMetadataInfoList) {
+          
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), 
validDocIdsMetadataInfo);
+        }
         returnedServersCount++;
       } catch (Exception e) {
         failedParses++;
         LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
       }
     }
+
     if (failedParses != 0) {
       LOGGER.error("Unable to parse server {} / {} response due to an error: 
", failedParses,
           serverURLsAndBodies.size());
@@ -281,7 +286,7 @@ public class ServerSegmentMetadataReader {
 
     LOGGER.info("Retrieved validDocIds metadata for {} segments from {} 
servers.", validDocIdsMetadataInfos.size(),
         returnedServersCount);
-    return validDocIdsMetadataInfos;
+    return new ArrayList<>(validDocIdsMetadataInfos.values());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to