This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 96fdacbf72 replaced getServer with getServers (#12545)
96fdacbf72 is described below

commit 96fdacbf72989b514366ec6144ef8dde4942461d
Author: Robert Zych <[email protected]>
AuthorDate: Tue Mar 5 10:21:23 2024 -0800

    replaced getServer with getServers (#12545)
---
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 39 ++++++++++++++--------
 .../UpsertCompactionTaskExecutorTest.java          | 11 +++---
 2 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 97a572a939..485f5a118d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.plugin.minion.tasks;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
@@ -144,19 +146,26 @@ public class MinionTaskUtils {
     HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
     String clusterName = minionContext.getHelixManager().getClusterName();
 
-    String server = getServer(segmentName, tableNameWithType, helixAdmin, 
clusterName);
-    InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, 
server);
-    String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
-
-    // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
-    // passing an empty list.
-    ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
-    return 
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
-        validDocIdsType, 60_000);
+    List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
+    for (String server : servers) {
+      InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
+      String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+
+      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
+      // passing an empty list.
+      ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
+      try {
+        return 
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
+                validDocIdsType, 60_000);
+      } catch (Exception e) {
+        LOGGER.info("Unable to retrieve validDocIdsBitmap for {} from {}", 
segmentName, endpoint);
+      }
+    }
+    throw new IllegalStateException("Unable to retrieve validDocIds for 
segment: " + segmentName);
   }
 
-  public static String getServer(String segmentName, String tableNameWithType, 
HelixAdmin helixAdmin,
-      String clusterName) {
+  public static List<String> getServers(String segmentName, String 
tableNameWithType, HelixAdmin helixAdmin,
+                                        String clusterName) {
     ExternalView externalView = 
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
     if (externalView == null) {
       throw new IllegalStateException("External view does not exist for table: 
" + tableNameWithType);
@@ -165,11 +174,15 @@ public class MinionTaskUtils {
     if (instanceStateMap == null) {
       throw new IllegalStateException("Failed to find segment: " + 
segmentName);
     }
+    ArrayList<String> servers = new ArrayList<>();
     for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
       if 
(entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE))
 {
-        return entry.getKey();
+        servers.add(entry.getKey());
       }
     }
-    throw new IllegalStateException("Failed to find ONLINE server for segment: 
" + segmentName);
+    if (servers.isEmpty()) {
+      throw new IllegalStateException("Failed to find any ONLINE servers for 
segment: " + segmentName);
+    }
+    return servers;
   }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
index 0869880ccf..741a98fa92 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
@@ -37,7 +38,7 @@ public class UpsertCompactionTaskExecutorTest {
   private static final String CLUSTER_NAME = "testCluster";
 
   @Test
-  public void testGetServer() {
+  public void testGetServers() {
     ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
     Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
     Map<String, String> map = new HashMap<>();
@@ -52,15 +53,15 @@ public class UpsertCompactionTaskExecutorTest {
     
Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool);
     minionContext.setHelixManager(helixManager);
 
-    String server = MinionTaskUtils.getServer(SEGMENT_NAME, 
REALTIME_TABLE_NAME, helixManager.getClusterManagmentTool(),
-        helixManager.getClusterName());
+    List<String> servers = MinionTaskUtils.getServers(SEGMENT_NAME, 
REALTIME_TABLE_NAME,
+        helixManager.getClusterManagmentTool(), helixManager.getClusterName());
 
-    Assert.assertEquals(server, "server1");
+    Assert.assertEquals(servers.get(0), "server1");
 
     // verify exception thrown with OFFLINE server
     map.put("server1", SegmentStateModel.OFFLINE);
     Assert.assertThrows(IllegalStateException.class,
-        () -> MinionTaskUtils.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME,
+        () -> MinionTaskUtils.getServers(SEGMENT_NAME, REALTIME_TABLE_NAME,
             helixManager.getClusterManagmentTool(), 
helixManager.getClusterName()));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to