This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b7d187ecb Add ideal state replica ID information for query routing 
(#17619)
82b7d187ecb is described below

commit 82b7d187ecb9288e2cef949bc0536278af285657
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 5 09:56:35 2026 -0800

    Add ideal state replica ID information for query routing (#17619)
---
 .../instanceselector/BaseInstanceSelector.java     | 37 +++++++++++++++-------
 .../ReplicaGroupInstanceSelector.java              | 18 ++++++++---
 .../instanceselector/SegmentInstanceCandidate.java | 12 ++++++-
 3 files changed, 49 insertions(+), 18 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 45f95f7e41b..dcd4422fc97 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -85,7 +85,7 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID
 public abstract class BaseInstanceSelector implements InstanceSelector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
   // To prevent int overflow, reset the request id once it reaches this value
-  private static final long MAX_REQUEST_ID = 1_000_000_000;
+  protected static final long MAX_REQUEST_ID = 1_000_000_000;
 
   protected TableConfig _tableConfig;
   protected String _tableNameWithType;
@@ -101,15 +101,15 @@ public abstract class BaseInstanceSelector implements 
InstanceSelector {
   protected int _tableNameHashForFixedReplicaRouting;
 
   // These 3 variables are the cached states to help accelerate the change 
processing
-  Set<String> _enabledInstances;
+  protected Set<String> _enabledInstances;
   // For old segments, all candidates are online
   // Reduce this map to reduce garbage
-  final Map<String, List<SegmentInstanceCandidate>> _oldSegmentCandidatesMap = 
new HashMap<>();
-  Map<String, NewSegmentState> _newSegmentStateMap;
+  protected final Map<String, List<SegmentInstanceCandidate>> 
_oldSegmentCandidatesMap = new HashMap<>();
+  protected Map<String, NewSegmentState> _newSegmentStateMap;
 
   // _segmentStates is needed for instance selection (multi-threaded), so it 
is made volatile.
-  private volatile SegmentStates _segmentStates;
-  private Map<String, ServerInstance> _enabledServerStore;
+  protected volatile SegmentStates _segmentStates;
+  protected Map<String, ServerInstance> _enabledServerStore;
 
   @Override
   public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> 
propertyStore,
@@ -261,17 +261,22 @@ public abstract class BaseInstanceSelector implements 
InstanceSelector {
     Set<Integer> pools = new HashSet<>();
     for (String segment : onlineSegments) {
       Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      // TODO: Verify whether sorting is actually needed
+      Map<String, String> sortedIdealStateMap = 
convertToSortedMap(idealStateInstanceStateMap);
       Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
       Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
+
       if (externalViewInstanceStateMap == null) {
         if (newSegmentCreationTimeMs != null) {
           // New segment
           List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-          for (Map.Entry<String, String> entry : 
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+          int idealStateReplicaId = 0;
+          for (Map.Entry<String, String> entry : 
sortedIdealStateMap.entrySet()) {
             if (isOnlineForRouting(entry.getValue())) {
               String instance = entry.getKey();
-              candidates.add(new SegmentInstanceCandidate(instance, false, 
getPool(instance)));
+              candidates.add(new SegmentInstanceCandidate(instance, false, 
getPool(instance), idealStateReplicaId));
             }
+            idealStateReplicaId++;
           }
           _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMs, candidates));
         } else {
@@ -283,19 +288,26 @@ public abstract class BaseInstanceSelector implements 
InstanceSelector {
         if (newSegmentCreationTimeMs != null) {
           // New segment
           List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-          for (Map.Entry<String, String> entry : 
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+          int idealStateReplicaId = 0;
+          for (Map.Entry<String, String> entry : 
sortedIdealStateMap.entrySet()) {
             if (isOnlineForRouting(entry.getValue())) {
               String instance = entry.getKey();
               candidates.add(
-                  new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance), getPool(instance)));
+                  new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance), getPool(instance),
+                      idealStateReplicaId));
             }
+            idealStateReplicaId++;
           }
           _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMs, candidates));
         } else {
           // Old segment
           List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(onlineInstances.size());
-          for (String instance : onlineInstances) {
-            candidates.add(new SegmentInstanceCandidate(instance, true, 
getPool(instance)));
+          int idealStateReplicaId = 0;
+          for (String instance : sortedIdealStateMap.keySet()) {
+            if (onlineInstances.contains(instance)) {
+              candidates.add(new SegmentInstanceCandidate(instance, true, 
getPool(instance), idealStateReplicaId));
+            }
+            idealStateReplicaId++;
           }
           _oldSegmentCandidatesMap.put(segment, candidates);
         }
@@ -457,6 +469,7 @@ public abstract class BaseInstanceSelector implements 
InstanceSelector {
     Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap =
         select(segments, requestIdInt, segmentStates, queryOptions);
     Set<String> unavailableSegments = segmentStates.getUnavailableSegments();
+
     if (unavailableSegments.isEmpty()) {
       return new SelectionResult(segmentToInstanceMap, 
Collections.emptyList(), 0);
     } else {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index ab14a4d9bbc..56a51a610e5 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -247,12 +247,15 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
       Set<String> onlineInstances = entry.getValue();
       Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+
       Set<String> unavailableInstances = 
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
       List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(onlineInstances.size());
-      for (String instance : onlineInstances) {
-        if (!unavailableInstances.contains(instance)) {
-          candidates.add(new SegmentInstanceCandidate(instance, true, 
getPool(instance)));
+      int idealStateReplicaId = 0;
+      for (String instance : 
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+        if (onlineInstances.contains(instance) && 
!unavailableInstances.contains(instance)) {
+          candidates.add(new SegmentInstanceCandidate(instance, true, 
getPool(instance), idealStateReplicaId));
         }
+        idealStateReplicaId++;
       }
       _oldSegmentCandidatesMap.put(segment, candidates);
     }
@@ -261,13 +264,18 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       String segment = entry.getKey();
       Set<String> onlineInstances = entry.getValue();
       Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Map<String, String> sortedIdealStateInstanceStateMap = 
convertToSortedMap(idealStateInstanceStateMap);
+
       Set<String> unavailableInstances =
           
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), 
Collections.emptySet());
       List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-      for (String instance : 
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+      int idealStateReplicaId = 0;
+      for (String instance : sortedIdealStateInstanceStateMap.keySet()) {
         if (!unavailableInstances.contains(instance)) {
-          candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance), getPool(instance)));
+          candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance), getPool(instance),
+              idealStateReplicaId));
         }
+        idealStateReplicaId++;
       }
       _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
index 7b251eb4397..f90b573e1e6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
@@ -32,6 +32,10 @@ public class SegmentInstanceCandidate {
   private final String _instance;
   private final boolean _online;
   private final int _pool;
+  // Represents the index of the server in the ideal state assignment for the 
segment. For example, if the ideal state
+  // assignment for a segment is [server1, server2, server3] and this 
candidate represents server2, then the
+  // idealStateReplicaId will be 1 (regardless of which replicas are 
available/online).
+  private final int _idealStateReplicaId;
 
   @VisibleForTesting
   public SegmentInstanceCandidate(String instance, boolean online) {
@@ -39,12 +43,14 @@ public class SegmentInstanceCandidate {
     _online = online;
     // no group
     _pool = FALLBACK_POOL_ID;
+    _idealStateReplicaId = -1;
   }
 
-  public SegmentInstanceCandidate(String instance, boolean online, int pool) {
+  public SegmentInstanceCandidate(String instance, boolean online, int pool, 
int idealStateReplicaId) {
     _instance = instance;
     _online = online;
     _pool = pool;
+    _idealStateReplicaId = idealStateReplicaId;
   }
 
   public String getInstance() {
@@ -59,6 +65,10 @@ public class SegmentInstanceCandidate {
     return _pool;
   }
 
+  public int getIdealStateReplicaId() {
+    return _idealStateReplicaId;
+  }
+
   @Override
   public String toString() {
     return "SegmentInstanceCandidate{" + "_instance='" + _instance + '\'' + ", 
_online=" + _online + ", _pool=" + _pool


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to