This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 82b7d187ecb Add ideal state replica ID information for query routing
(#17619)
82b7d187ecb is described below
commit 82b7d187ecb9288e2cef949bc0536278af285657
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 5 09:56:35 2026 -0800
Add ideal state replica ID information for query routing (#17619)
---
.../instanceselector/BaseInstanceSelector.java | 37 +++++++++++++++-------
.../ReplicaGroupInstanceSelector.java | 18 ++++++++---
.../instanceselector/SegmentInstanceCandidate.java | 12 ++++++-
3 files changed, 49 insertions(+), 18 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 45f95f7e41b..dcd4422fc97 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -85,7 +85,7 @@ import static
org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID
public abstract class BaseInstanceSelector implements InstanceSelector {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
// To prevent int overflow, reset the request id once it reaches this value
- private static final long MAX_REQUEST_ID = 1_000_000_000;
+ protected static final long MAX_REQUEST_ID = 1_000_000_000;
protected TableConfig _tableConfig;
protected String _tableNameWithType;
@@ -101,15 +101,15 @@ public abstract class BaseInstanceSelector implements
InstanceSelector {
protected int _tableNameHashForFixedReplicaRouting;
// These 3 variables are the cached states to help accelerate the change
processing
- Set<String> _enabledInstances;
+ protected Set<String> _enabledInstances;
// For old segments, all candidates are online
// Reduce this map to reduce garbage
- final Map<String, List<SegmentInstanceCandidate>> _oldSegmentCandidatesMap =
new HashMap<>();
- Map<String, NewSegmentState> _newSegmentStateMap;
+ protected final Map<String, List<SegmentInstanceCandidate>>
_oldSegmentCandidatesMap = new HashMap<>();
+ protected Map<String, NewSegmentState> _newSegmentStateMap;
// _segmentStates is needed for instance selection (multi-threaded), so it
is made volatile.
- private volatile SegmentStates _segmentStates;
- private Map<String, ServerInstance> _enabledServerStore;
+ protected volatile SegmentStates _segmentStates;
+ protected Map<String, ServerInstance> _enabledServerStore;
@Override
public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord>
propertyStore,
@@ -261,17 +261,22 @@ public abstract class BaseInstanceSelector implements
InstanceSelector {
Set<Integer> pools = new HashSet<>();
for (String segment : onlineSegments) {
Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ // TODO: Verify whether sorting is actually needed
+ Map<String, String> sortedIdealStateMap =
convertToSortedMap(idealStateInstanceStateMap);
Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
Map<String, String> externalViewInstanceStateMap =
externalViewAssignment.get(segment);
+
if (externalViewInstanceStateMap == null) {
if (newSegmentCreationTimeMs != null) {
// New segment
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
- for (Map.Entry<String, String> entry :
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+ int idealStateReplicaId = 0;
+ for (Map.Entry<String, String> entry :
sortedIdealStateMap.entrySet()) {
if (isOnlineForRouting(entry.getValue())) {
String instance = entry.getKey();
- candidates.add(new SegmentInstanceCandidate(instance, false,
getPool(instance)));
+ candidates.add(new SegmentInstanceCandidate(instance, false,
getPool(instance), idealStateReplicaId));
}
+ idealStateReplicaId++;
}
_newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMs, candidates));
} else {
@@ -283,19 +288,26 @@ public abstract class BaseInstanceSelector implements
InstanceSelector {
if (newSegmentCreationTimeMs != null) {
// New segment
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
- for (Map.Entry<String, String> entry :
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+ int idealStateReplicaId = 0;
+ for (Map.Entry<String, String> entry :
sortedIdealStateMap.entrySet()) {
if (isOnlineForRouting(entry.getValue())) {
String instance = entry.getKey();
candidates.add(
- new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance), getPool(instance)));
+ new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance), getPool(instance),
+ idealStateReplicaId));
}
+ idealStateReplicaId++;
}
_newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMs, candidates));
} else {
// Old segment
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(onlineInstances.size());
- for (String instance : onlineInstances) {
- candidates.add(new SegmentInstanceCandidate(instance, true,
getPool(instance)));
+ int idealStateReplicaId = 0;
+ for (String instance : sortedIdealStateMap.keySet()) {
+ if (onlineInstances.contains(instance)) {
+ candidates.add(new SegmentInstanceCandidate(instance, true,
getPool(instance), idealStateReplicaId));
+ }
+ idealStateReplicaId++;
}
_oldSegmentCandidatesMap.put(segment, candidates);
}
@@ -457,6 +469,7 @@ public abstract class BaseInstanceSelector implements
InstanceSelector {
Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap =
select(segments, requestIdInt, segmentStates, queryOptions);
Set<String> unavailableSegments = segmentStates.getUnavailableSegments();
+
if (unavailableSegments.isEmpty()) {
return new SelectionResult(segmentToInstanceMap,
Collections.emptyList(), 0);
} else {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index ab14a4d9bbc..56a51a610e5 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -247,12 +247,15 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
// NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
Set<String> onlineInstances = entry.getValue();
Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+
Set<String> unavailableInstances =
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(onlineInstances.size());
- for (String instance : onlineInstances) {
- if (!unavailableInstances.contains(instance)) {
- candidates.add(new SegmentInstanceCandidate(instance, true,
getPool(instance)));
+ int idealStateReplicaId = 0;
+ for (String instance :
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+ if (onlineInstances.contains(instance) &&
!unavailableInstances.contains(instance)) {
+ candidates.add(new SegmentInstanceCandidate(instance, true,
getPool(instance), idealStateReplicaId));
}
+ idealStateReplicaId++;
}
_oldSegmentCandidatesMap.put(segment, candidates);
}
@@ -261,13 +264,18 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
String segment = entry.getKey();
Set<String> onlineInstances = entry.getValue();
Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ Map<String, String> sortedIdealStateInstanceStateMap =
convertToSortedMap(idealStateInstanceStateMap);
+
Set<String> unavailableInstances =
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(),
Collections.emptySet());
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
- for (String instance :
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+ int idealStateReplicaId = 0;
+ for (String instance : sortedIdealStateInstanceStateMap.keySet()) {
if (!unavailableInstances.contains(instance)) {
- candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance), getPool(instance)));
+ candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance), getPool(instance),
+ idealStateReplicaId));
}
+ idealStateReplicaId++;
}
_newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
index 7b251eb4397..f90b573e1e6 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
@@ -32,6 +32,10 @@ public class SegmentInstanceCandidate {
private final String _instance;
private final boolean _online;
private final int _pool;
+ // Represents the index of the server in the ideal state assignment for the
segment. For example, if the ideal state
+ // assignment for a segment is [server1, server2, server3] and this
candidate represents server2, then the
+ // idealStateReplicaId will be 1 (regardless of which replicas are
available/online).
+ private final int _idealStateReplicaId;
@VisibleForTesting
public SegmentInstanceCandidate(String instance, boolean online) {
@@ -39,12 +43,14 @@ public class SegmentInstanceCandidate {
_online = online;
// no group
_pool = FALLBACK_POOL_ID;
+ _idealStateReplicaId = -1;
}
- public SegmentInstanceCandidate(String instance, boolean online, int pool) {
+ public SegmentInstanceCandidate(String instance, boolean online, int pool,
int idealStateReplicaId) {
_instance = instance;
_online = online;
_pool = pool;
+ _idealStateReplicaId = idealStateReplicaId;
}
public String getInstance() {
@@ -59,6 +65,10 @@ public class SegmentInstanceCandidate {
return _pool;
}
+ public int getIdealStateReplicaId() {
+ return _idealStateReplicaId;
+ }
+
@Override
public String toString() {
return "SegmentInstanceCandidate{" + "_instance='" + _instance + '\'' + ",
_online=" + _online + ", _pool=" + _pool
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]