This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b4e03b1252 Minor cleanups and refactors in instance selector module 
(#16744)
4b4e03b1252 is described below

commit 4b4e03b1252c5333c47de63f3567bf6e7e89cc07
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Sep 4 09:12:14 2025 +0530

    Minor cleanups and refactors in instance selector module (#16744)
---
 .../ServerSelectionContext.java                    |  12 ++-
 .../instanceselector/BalancedInstanceSelector.java | 103 +++++++++------------
 .../instanceselector/BaseInstanceSelector.java     |  94 ++++++++-----------
 .../instanceselector/InstanceSelectorFactory.java  |   4 +-
 .../MultiStageReplicaGroupSelector.java            |   4 +-
 .../ReplicaGroupInstanceSelector.java              |  92 ++++++------------
 .../instanceselector/SegmentInstanceCandidate.java |   6 ++
 .../routing/instanceselector/SegmentStates.java    |  19 ++--
 .../apache/pinot/common/metrics/BrokerMeter.java   |  10 +-
 9 files changed, 149 insertions(+), 195 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
index 1132b2c4b25..dc17ea93a3e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
@@ -21,6 +21,7 @@ package 
org.apache.pinot.broker.routing.adaptiveserverselector;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.broker.routing.instanceselector.InstanceSelectorConfig;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 
 
@@ -29,6 +30,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
  * servers are selected for query execution.
  */
 public class ServerSelectionContext {
+  private final InstanceSelectorConfig _config;
   /**
    * Map of query options that can influence server selection behavior.
    * These options are passed into the context class to avoid endless 
constructor argument changes
@@ -44,6 +46,7 @@ public class ServerSelectionContext {
   private final Map<String, String> _queryOptions;
   // If some query options need further processing, store the parsing result 
below to avoid duplicate parsing.
   private final List<Integer> _orderedPreferredPools;
+  private final boolean _isUseFixedReplica;
 
   /**
    * Creates a new server selection context with the given query options.
@@ -52,15 +55,22 @@ public class ServerSelectionContext {
    *
    * @param queryOptions map of query options that may contain server 
selection preferences
    */
-  public ServerSelectionContext(Map<String, String> queryOptions) {
+  public ServerSelectionContext(Map<String, String> queryOptions, 
InstanceSelectorConfig instanceSelectorConfig) {
     _queryOptions = queryOptions == null ? Collections.emptyMap() : 
queryOptions;
+    _config = instanceSelectorConfig;
     _orderedPreferredPools = 
QueryOptionsUtils.getOrderedPreferredPools(_queryOptions);
+    Boolean isUseFixedReplica = 
QueryOptionsUtils.isUseFixedReplica(_queryOptions);
+    _isUseFixedReplica = isUseFixedReplica != null ? isUseFixedReplica : 
instanceSelectorConfig.isUseFixedReplica();
   }
 
   public Map<String, String> getQueryOptions() {
     return _queryOptions;
   }
 
+  public boolean isUseFixedReplica() {
+    return _isUseFixedReplica;
+  }
+
   public List<Integer> getOrderedPreferredPools() {
     return _orderedPreferredPools;
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 35fe7ef002f..4afd3a26dcd 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -32,21 +32,21 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.HashUtil;
 
-/**
- * Instance selector to balance the number of segments served by each selected 
server instance.
- * <p>If AdaptiveServerSelection is enabled, the request is routed to the best 
available server for a segment
- * when it is processed below. This is a best effort approach in distributing 
the query to all available servers.
- * If some servers are performing poorly, they might not end up being picked 
for any of the segments. For example,
- * there's a query for Segments 1 (Seg1), 2 (Seg2) and Seg3). The servers are 
S1, S2, S3. The algorithm works as
- * follows:
- *    Step1: Process seg1. Fetch server rankings. Pick the best server.
- *    Step2: Process seg2. Fetch server rankings (could have changed or not 
since Step 1). Pick the best server.
- *    Step3: Process seg3. Fetch server rankings (could have changed or not 
since Step 2). Pick the best server.
- *
- * <p>If AdaptiveServerSelection is disabled, the selection algorithm will 
always evenly distribute the traffic to all
- * replicas of each segment, and will try to select different replica id for 
each segment. The algorithm is very
- * light-weight and will do best effort to balance the number of segments 
served by each selected server instance.
- */
+/// Instance selector to balance the number of segments served by each 
selected server instance.
+///
+/// If AdaptiveServerSelection is enabled, the request is routed to the best 
available server for a segment
+/// when it is processed below. This is a best effort approach in distributing 
the query to all available servers.
+/// If some servers are performing poorly, they might not end up being picked 
for any of the segments. For example,
+/// there's a query for Segments 1 (Seg1), 2 (Seg2) and 3 (Seg3). The servers 
are S1, S2, S3. The algorithm works as
+/// follows:
+/// - Step1: Process seg1. Fetch server rankings. Pick the best server.
+/// - Step2: Process seg2. Fetch server rankings (could have changed or not 
since Step 1). Pick the best server.
+/// - Step3: Process seg3. Fetch server rankings (could have changed or not 
since Step 2). Pick the best server.
+///
+/// If AdaptiveServerSelection is disabled, the selection algorithm will 
always evenly distribute the traffic to all
+/// replicas of each segment, and will try to select different replica id for 
each segment. The algorithm is very
+/// light-weight and will do best effort to balance the number of segments 
served by each selected server instance.
+///
 public class BalancedInstanceSelector extends BaseInstanceSelector {
 
   public BalancedInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
@@ -61,57 +61,40 @@ public class BalancedInstanceSelector extends 
BaseInstanceSelector {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
     // No need to adjust this map per total segment numbers, as optional 
segments should be empty most of the time.
     Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
-    ServerSelectionContext ctx = new ServerSelectionContext(queryOptions);
-    // TODO: refactor to dedup the code and use a single for loop
+    ServerSelectionContext ctx = new ServerSelectionContext(queryOptions, 
_config);
     Map<Integer, Integer> poolToSegmentCount = new HashMap<>();
-    if (_priorityPoolInstanceSelector != null) {
-      for (String segment : segments) {
-        List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
-        // NOTE: candidates can be null when there is no enabled instances for 
the segment, or the instance selector has
-        // not been updated (we update all components for routing in sequence)
-        if (candidates == null) {
-          continue;
-        }
-        SegmentInstanceCandidate candidate = 
_priorityPoolInstanceSelector.select(ctx, candidates);
+
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      // NOTE: candidates can be null when there are no enabled instances for 
the segment, or the instance selector has
+      // not been updated (we update all components for routing in sequence)
+      if (candidates == null) {
+        continue;
+      }
+      SegmentInstanceCandidate selectedCandidate;
+
+      if (_priorityPoolInstanceSelector != null) {
+        // Adaptive server selection is enabled
+        selectedCandidate = _priorityPoolInstanceSelector.select(ctx, 
candidates);
         // If candidates is not null, candidates is always non-empty because 
segments with no enabled online servers
         // are placed in segmentStates.getUnavailableSegments()
-        assert candidate != null;
-        poolToSegmentCount.merge(candidate.getPool(), 1, Integer::sum);
-        // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
-        // broker or server can skip it upon any issue to process it.
-        if (candidate.isOnline()) {
-          segmentToSelectedInstanceMap.put(segment, candidate.getInstance());
-        } else {
-          optionalSegmentToInstanceMap.put(segment, candidate.getInstance());
-        }
+        assert selectedCandidate != null;
+      } else if (ctx.isUseFixedReplica()) {
+        // candidates array is always sorted
+        selectedCandidate = 
candidates.get(_tableNameHashForFixedReplicaRouting % candidates.size());
+      } else {
+        selectedCandidate = candidates.get(requestId++ % candidates.size());
       }
-    } else {
-      boolean useFixedReplica = isUseFixedReplica(queryOptions);
-      for (String segment : segments) {
-        List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
-        // NOTE: candidates can be null when there is no enabled instances for 
the segment, or the instance selector has
-        // not been updated (we update all components for routing in sequence)
-        if (candidates == null) {
-          continue;
-        }
-        int selectedIdx;
-        if (useFixedReplica) {
-          // candidates array is always sorted
-          selectedIdx = _tableNameHashForFixedReplicaRouting % 
candidates.size();
-        } else {
-          selectedIdx = requestId++ % candidates.size();
-        }
-        SegmentInstanceCandidate selectedCandidate = 
candidates.get(selectedIdx);
-        poolToSegmentCount.merge(selectedCandidate.getPool(), 1, Integer::sum);
-        // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
-        // broker or server can skip it upon any issue to process it.
-        if (selectedCandidate.isOnline()) {
-          segmentToSelectedInstanceMap.put(segment, 
selectedCandidate.getInstance());
-        } else {
-          optionalSegmentToInstanceMap.put(segment, 
selectedCandidate.getInstance());
-        }
+      poolToSegmentCount.merge(selectedCandidate.getPool(), 1, Integer::sum);
+      // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
+      // broker or server can skip it upon any issue to process it.
+      if (selectedCandidate.isOnline()) {
+        segmentToSelectedInstanceMap.put(segment, 
selectedCandidate.getInstance());
+      } else {
+        optionalSegmentToInstanceMap.put(segment, 
selectedCandidate.getInstance());
       }
     }
+
     for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) {
       _brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES, 
entry.getValue(),
         BrokerMetrics.getTagForPreferredPool(queryOptions), 
String.valueOf(entry.getKey()));
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index c88d3102e62..5acd67f25b5 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -39,7 +39,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.PriorityPoolInstanceSelector;
-import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -47,7 +46,6 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -57,33 +55,32 @@ import org.slf4j.LoggerFactory;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID;
 
 
-/**
- * Base implementation of instance selector. Selector maintains a map from 
segment to enabled ONLINE/CONSUMING server
- * instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances are
- * in OFFLINE/ERROR state).
- * <p>
- * Special handling of new segment: It is common for new segment to be 
partially available or not available at all in
- * all instances.
- * 1) We don't report new segment as unavailable segments.
- * 2) To avoid creating hotspot instances, unavailable instances for new 
segment won't be excluded for instance
- * selection. When it is selected, we don't serve the new segment.
- * <p>
- * Definition of new segment:
- * 1) Segment created more than 5 minutes ago.
- * - If we first see a segment via initialization, we look up segment creation 
time from zookeeper.
- * - If we first see a segment via onAssignmentChange initialization, we use 
the calling time of onAssignmentChange
- * as approximation.
- * 2) We retire new segment as old when:
- * - The creation time is more than 5 minutes ago
- * - Any instance for new segment is in ERROR state
- * - External view for segment converges with ideal state
- *
- * Note that this implementation means:
- * 1) Inconsistent selection of new segments across queries (some queries will 
serve new segments and others won't).
- * 2) When there is no state update from helix, new segments won't be retired 
because of the time passing (those with
- * creation time more than 5 minutes ago).
- * TODO: refresh new/old segment state where there is no update from helix for 
long time.
- */
+/// Base implementation of instance selector. Selector maintains a map from 
segment to enabled ONLINE/CONSUMING server
+/// instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances
+/// are in OFFLINE/ERROR state).
+///
+/// Special handling of new segment: It is common for new segment to be 
partially available or not available at all in
+/// all instances.
+/// 1) We don't report new segment as unavailable segments.
+/// 2) To avoid creating hotspot instances, unavailable instances for new 
segment won't be excluded for instance
+/// selection. When it is selected, we don't serve the new segment.
+///
+/// Definition of new segment:
+/// 1) Segment created more than 5 minutes ago.
+/// - If we first see a segment via initialization, we look up segment 
creation time from zookeeper.
+/// - If we first see a segment via onAssignmentChange initialization, we use 
the calling time of onAssignmentChange
+/// as approximation.
+/// 2) We retire new segment as old when:
+/// - The creation time is more than 5 minutes ago
+/// - Any instance for new segment is in ERROR state
+/// - External view for segment converges with ideal state
+///
+/// Note that this implementation means:
+/// 1) Inconsistent selection of new segments across queries (some queries 
will serve new segments and others won't).
+/// 2) When there is no state update from helix, new segments won't be retired 
because of the time passing (those with
+/// creation time more than 5 minutes ago).
+/// TODO: refresh new/old segment state where there is no update from helix 
for long time.
+///
 abstract class BaseInstanceSelector implements InstanceSelector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
   // To prevent int overflow, reset the request id once it reaches this value
@@ -96,7 +93,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   // Will be null if and only if adaptiveServerSelector is null
   final PriorityPoolInstanceSelector _priorityPoolInstanceSelector;
   final Clock _clock;
-  final boolean _useFixedReplica;
+  final InstanceSelectorConfig _config;
   final long _newSegmentExpirationTimeInSeconds;
   final boolean _emitSinglePoolSegmentsMetric;
   final int _tableNameHashForFixedReplicaRouting;
@@ -120,7 +117,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
     _clock = clock;
-    _useFixedReplica = config.isUseFixedReplica();
+    _config = config;
     _newSegmentExpirationTimeInSeconds = 
config.getNewSegmentExpirationTimeInSeconds();
     _emitSinglePoolSegmentsMetric = 
config.shouldEmitSinglePoolSegmentsMetrics();
     // Using raw table name to ensure queries spanning across REALTIME and 
OFFLINE tables are routed to the same
@@ -131,7 +128,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
 
     _priorityPoolInstanceSelector =
         _adaptiveServerSelector == null ? null : new 
PriorityPoolInstanceSelector(_adaptiveServerSelector);
-    if (_adaptiveServerSelector != null && _useFixedReplica) {
+    if (_adaptiveServerSelector != null && config.isUseFixedReplica()) {
       throw new IllegalArgumentException(
           "AdaptiveServerSelector and consistent routing cannot be enabled at 
the same time");
     }
@@ -248,13 +245,11 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     }
   }
 
-  /**
-   * Updates the segment maps based on the given ideal state, external view, 
online segments (segments with
-   * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}) and new segments.
-   * After this update:
-   * - Old segments' online instances should be tracked in 
_oldSegmentCandidatesMap
-   * - New segments' state (creation time and candidate instances) should be 
tracked in _newSegmentStateMap
-   */
+  /// Updates the segment maps based on the given ideal state, external view, 
online segments (segments with
+  /// ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@code SegmentPreSelector}) and new
+  /// segments. After this update:
+  /// - Old segments' online instances should be tracked in 
_oldSegmentCandidatesMap
+  /// - New segments' state (creation time and candidate instances) should be 
tracked in _newSegmentStateMap
   void updateSegmentMaps(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments,
       Map<String, Long> newSegmentCreationTimeMap) {
     _oldSegmentCandidatesMap.clear();
@@ -262,7 +257,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
 
     Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
-    int count = 0;
+    int numSinglePoolSegments = 0;
     Set<Integer> pools = new HashSet<>();
     for (String segment : onlineSegments) {
       Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
@@ -311,12 +306,12 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
           pools.add(getPool(instance));
         }
         if (pools.size() < 2) {
-          count++;
+          numSinglePoolSegments++;
         }
       }
     }
     if (_emitSinglePoolSegmentsMetric) {
-      _brokerMetrics.addMeteredTableValue(_tableNameWithType, 
BrokerMeter.SINGLE_POOL_SEGMENTS, count);
+      _brokerMetrics.addMeteredTableValue(_tableNameWithType, 
BrokerMeter.SINGLE_POOL_SEGMENTS, numSinglePoolSegments);
     }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: 
{}", _newSegmentStateMap.keySet(),
@@ -393,8 +388,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   /**
    * {@inheritDoc}
    *
-   * <p>Updates the cached enabled instances and re-calculates {@code 
segmentToEnabledInstancesMap} and
-   * {@code unavailableSegments} based on the cached states.
+   * <p>Updates the cached enabled instances and re-calculates {@link 
#_segmentStates}.
    */
   @Override
   public void onInstancesChange(Set<String> enabledInstances, List<String> 
changedInstances) {
@@ -405,9 +399,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   /**
    * {@inheritDoc}
    *
-   * <p>Updates the cached maps ({@code segmentToOnlineInstancesMap}, {@code 
segmentToOfflineInstancesMap} and
-   * {@code instanceToSegmentsMap}) and re-calculates {@code 
segmentToEnabledInstancesMap} and
-   * {@code unavailableSegments} based on the cached states.
+   * <p>Updates the cached maps ({@link #_oldSegmentCandidatesMap} and {@link 
#_newSegmentStateMap}, and re-calculates
+   * {@link #_segmentStates} based on the cached states.
    */
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
@@ -477,11 +470,6 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     }
   }
 
-  protected boolean isUseFixedReplica(Map<String, String> queryOptions) {
-    Boolean queryOption = QueryOptionsUtils.isUseFixedReplica(queryOptions);
-    return queryOption != null ? queryOption : _useFixedReplica;
-  }
-
   @Override
   public Set<String> getServingInstances() {
     return _segmentStates.getServingInstances();
@@ -503,7 +491,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   /**
    * Selects the server instances for the given segments based on the request 
id and segment states. Returns two maps
    * from segment to selected server instance hosting the segment. The 2nd map 
is for optional segments. The optional
-   * segments are used to get the new segments that is not online yet. Instead 
of simply skipping them by broker at
+   * segments are used to get the new segments that are not online yet. 
Instead of simply skipping them by broker at
    * routing time, we can send them to servers and let servers decide how to 
handle them.
    */
   abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/> 
select(List<String> segments,
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 25f57cbbffd..f49de38ac57 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -69,11 +69,11 @@ public class InstanceSelectorFactory {
     long newSegmentExpirationTimeInSeconds =
         
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS,
         
CommonConstants.Broker.DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS);
-    boolean emitSinglePoolSegments = brokerConfig.getProperty(
+    boolean emitSinglePoolSegmentsMetric = brokerConfig.getProperty(
             
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_SINGLE_POOL_SEGMENTS_METRIC,
             CommonConstants.Broker.DEFAULT_ENABLE_SINGLE_POOL_SEGMENTS_METRIC);
     InstanceSelectorConfig config = new 
InstanceSelectorConfig(useFixedReplica, newSegmentExpirationTimeInSeconds,
-            emitSinglePoolSegments);
+            emitSinglePoolSegmentsMetric);
     if (routingConfig != null) {
       if 
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
           || (tableConfig.getTableType() == TableType.OFFLINE && 
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index b42ee431429..40e46adbc78 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -34,6 +34,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import 
org.apache.pinot.broker.routing.adaptiveserverselector.ServerSelectionContext;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -90,7 +91,8 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
     // Create a copy of InstancePartitions to avoid race-condition with 
event-listeners above.
     InstancePartitions instancePartitions = _instancePartitions;
     int replicaGroupSelected;
-    if (isUseFixedReplica(queryOptions)) {
+    ServerSelectionContext ctx = new ServerSelectionContext(queryOptions, 
_config);
+    if (ctx.isUseFixedReplica()) {
       // When using sticky routing, we want to iterate over the 
instancePartitions in order to ensure deterministic
       // selection of replica group across queries i.e. same instance replica 
group id is picked each time.
       // Since the instances within a selected replica group are iterated in 
order, the assignment within a selected
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index f087e98d821..88793eac7cb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -46,8 +47,8 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
  * that have the same segments assigned. For an example, if S1 is a server in 
replica-group 1, and it has mirror server
  * S2 in replica-group 2 and S3 in replica-group 3. All segments assigned to 
S1 will also be assigned to S2 and S3. In
  * stable scenario (external view matches ideal state), all segments assigned 
to S1 will have the same enabled instances
- * of [S1, S2, S3] sorted (in alphabetical order). If we always pick the same 
index of enabled instances for all
- * segments, only one of S1, S2, S3 will be picked, so it is guaranteed that 
we pick the least server instances for the
+ * of [S1, S2, S3] sorted (in alphabetical order). If we pick the same index 
of enabled instances for all segments for a
+ * request, only one of S1, S2, S3 will be picked, so it is guaranteed that we 
pick the least server instances for the
  * request (there is no guarantee on choosing servers from the same 
replica-group though). In transitioning/error
  * scenario (external view does not match ideal state), there is no guarantee 
on picking the least server instances, but
  * the traffic is guaranteed to be evenly distributed to all available 
instances to avoid overwhelming hotspot servers.
@@ -60,7 +61,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
  * <p>If AdaptiveServerSelection is enabled, a single snapshot of the server 
ranking is fetched. This ranking is
  * referenced to pick the best available server for each segment. The 
algorithm ends up picking the minimum number of
  * servers required to process a query because it references a single snapshot 
of the server rankings. Currently,
- * NUM_REPLICA_GROUPS_TO_QUERY is not supported is AdaptiveServerSelection is 
enabled.
+ * NUM_REPLICA_GROUPS_TO_QUERY is not supported if AdaptiveServerSelection is 
enabled.
  */
 public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
 
@@ -73,7 +74,7 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
   @Override
   Pair<Map<String, String>, Map<String, String>> select(List<String> segments, 
int requestId,
       SegmentStates segmentStates, Map<String, String> queryOptions) {
-    ServerSelectionContext ctx = new ServerSelectionContext(queryOptions);
+    ServerSelectionContext ctx = new ServerSelectionContext(queryOptions, 
_config);
     if (_adaptiveServerSelector != null) {
       // Adaptive Server Selection is enabled.
       List<SegmentInstanceCandidate> candidateServers = 
fetchCandidateServersForQuery(segments, segmentStates);
@@ -85,42 +86,52 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       for (int idx = 0; idx < serverRankList.size(); idx++) {
         serverRankMap.put(serverRankList.get(idx), idx);
       }
-      return selectServersUsingAdaptiveServerSelector(segments, requestId, 
segmentStates, serverRankMap, ctx);
+      return selectServers(segments, requestId, segmentStates, serverRankMap, 
ctx);
     } else {
       // Adaptive Server Selection is NOT enabled.
-      return selectServersUsingRoundRobin(segments, requestId, segmentStates, 
ctx);
+      return selectServers(segments, requestId, segmentStates, null, ctx);
     }
   }
 
-  private Pair<Map<String, String>, Map<String, String>> 
selectServersUsingRoundRobin(List<String> segments,
-      int requestId, SegmentStates segmentStates, ServerSelectionContext ctx) {
+  private Pair<Map<String, String>, Map<String, String>> 
selectServers(List<String> segments, int requestId,
+      SegmentStates segmentStates, @Nullable Map<String, Integer> 
serverRankMap, ServerSelectionContext ctx) {
+
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
     // No need to adjust this map per total segment numbers, as optional 
segments should be empty most of the time.
     Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
     Map<Integer, Integer> poolToSegmentCount = new HashMap<>();
-    boolean useFixedReplica = isUseFixedReplica(ctx.getQueryOptions());
+    boolean useFixedReplica = ctx.isUseFixedReplica();
     Integer numReplicaGroupsToQuery = 
QueryOptionsUtils.getNumReplicaGroupsToQuery(ctx.getQueryOptions());
     int numReplicaGroups = numReplicaGroupsToQuery != null ? 
numReplicaGroupsToQuery : 1;
     int replicaOffset = 0;
     for (String segment : segments) {
       List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
-      // NOTE: candidates can be null when there is no enabled instances for 
the segment, or the instance selector has
+      // NOTE: candidates can be null when there are no enabled instances for 
the segment, or the instance selector has
       // not been updated (we update all components for routing in sequence)
       if (candidates == null) {
         continue;
       }
-      // Round robin selection.
-      int numCandidates = candidates.size();
-      int instanceIdx;
 
+      // Round-robin selection (default behavior)
+      int numCandidates = candidates.size();
+      int instanceIdx = (requestId + replicaOffset) % numCandidates;
+      SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
       if (useFixedReplica) {
-        // candidates array is always sorted
-        instanceIdx = (_tableNameHashForFixedReplicaRouting + replicaOffset) % 
numCandidates;
-      } else {
-        instanceIdx = (requestId + replicaOffset) % numCandidates;
+        // Adaptive Server Selection cannot be used with fixed replica routing.
+        // The candidates array is always sorted
+        selectedInstance = 
candidates.get((_tableNameHashForFixedReplicaRouting + replicaOffset) % 
numCandidates);
+      } else if (MapUtils.isNotEmpty(serverRankMap)) {
+        // Adaptive Server Selection is enabled.
+        // Use the instance with the best rank if all servers have stats 
populated, else use the round-robin selected
+        // instance
+        selectedInstance = candidates.stream()
+            .anyMatch(candidate -> 
!serverRankMap.containsKey(candidate.getInstance()))
+            ? selectedInstance
+            : candidates.stream()
+                .min(Comparator.comparingInt(candidate -> 
serverRankMap.get(candidate.getInstance())))
+                .orElse(selectedInstance);
       }
 
-      SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
       poolToSegmentCount.merge(selectedInstance.getPool(), 1, Integer::sum);
       // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
       // broker or server can skip it upon any issue to process it.
@@ -141,51 +152,6 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
     return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
   }
 
-  private Pair<Map<String, String>, Map<String, String>> 
selectServersUsingAdaptiveServerSelector(List<String> segments,
-      int requestId, SegmentStates segmentStates, Map<String, Integer> 
serverRankMap,
-      ServerSelectionContext ctx) {
-    Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
-    // No need to adjust this map per total segment numbers, as optional 
segments should be empty most of the time.
-    Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
-    Map<Integer, Integer> poolToSegmentCount = new HashMap<>();
-    for (String segment : segments) {
-      // NOTE: candidates can be null when there is no enabled instances for 
the segment, or the instance selector has
-      // not been updated (we update all components for routing in sequence)
-      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
-      if (candidates == null) {
-        continue;
-      }
-
-      // Round Robin selection
-      int roundRobinInstanceIdx = requestId % candidates.size();
-      SegmentInstanceCandidate selectedInstance = 
candidates.get(roundRobinInstanceIdx);
-
-      // Adaptive Server Selection logic
-      if (!serverRankMap.isEmpty()) {
-        // Use instance with the best rank if all servers have stats 
populated, if not use round-robin selected instance
-        selectedInstance = candidates.stream()
-            .anyMatch(candidate -> 
!serverRankMap.containsKey(candidate.getInstance()))
-            ? candidates.get(roundRobinInstanceIdx)
-            : candidates.stream()
-                .min(Comparator.comparingInt(candidate -> 
serverRankMap.get(candidate.getInstance())))
-                .orElse(candidates.get(roundRobinInstanceIdx));
-      }
-      poolToSegmentCount.merge(selectedInstance.getPool(), 1, Integer::sum);
-      // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
-      // broker or server can skip it upon any issue to process it.
-      if (selectedInstance.isOnline()) {
-        segmentToSelectedInstanceMap.put(segment, 
selectedInstance.getInstance());
-      } else {
-        optionalSegmentToInstanceMap.put(segment, 
selectedInstance.getInstance());
-      }
-    }
-    for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) {
-      _brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES, 
entry.getValue(),
-          BrokerMetrics.getTagForPreferredPool(ctx.getQueryOptions()), 
String.valueOf(entry.getKey()));
-    }
-    return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
-  }
-
   private List<SegmentInstanceCandidate> 
fetchCandidateServersForQuery(List<String> segments,
       SegmentStates segmentStates) {
     Map<String, SegmentInstanceCandidate> candidateServers = new HashMap<>();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
index 6dcb88eb54c..7b251eb4397 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
@@ -58,4 +58,10 @@ public class SegmentInstanceCandidate {
   public int getPool() {
     return _pool;
   }
+
+  @Override
+  public String toString() {
+    return "SegmentInstanceCandidate{" + "_instance='" + _instance + '\'' + ", 
_online=" + _online + ", _pool=" + _pool
+        + '}';
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
index c8809549b71..bfb56dd7bff 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
@@ -25,16 +25,15 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.Immutable;
 
 
-/**
- * The {@code SegmentStates} contains the candidate instances for each 
segment, and the unavailable segments for routing
- * purpose.
- *
- * For old segments, the instance candidates should always have online flag 
set to true.
- * For old segments without any enabled instance candidates, we report them as 
unavailable segments.
- *
- * For new segments, the online flag within the instance candidates indicates 
whether the instance is online or not.
- * We don't report new segments as unavailable segments because it is valid 
for new segments to be offline.
- */
+/// The {@code SegmentStates} contains the candidate instances for each 
segment, and the unavailable segments for
+/// routing purpose.
+///
+/// For old segments, the instance candidates should always have online flag 
set to true.
+/// For old segments without any enabled instance candidates, we report them 
as unavailable segments.
+///
+/// For new segments, the online flag within the instance candidates indicates 
whether the instance is online or not.
+/// We don't report new segments as unavailable segments because it is valid 
for new segments to be offline.
+///
 @Immutable
 public class SegmentStates {
   private final Map<String, List<SegmentInstanceCandidate>> 
_instanceCandidatesMap;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 527ee56dc6e..bbe08624457 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -58,14 +58,14 @@ public class BrokerMeter implements AbstractMetrics.Meter {
    */
   public static final BrokerMeter POOL_QUERIES = create("POOL_QUERIES", 
"routing", false);
   /**
-   * Number of segment selected per pool during query execution.
+   * Number of segments selected per pool during query execution.
    * <p>
    * This metric is not global and is attached to a particular pool.
-   * Currently this counter include single stage queries only.
+   * Currently, this counter includes single-stage queries only.
    * <p>
-   * Let's say the query option orderedReferredPools is set and a few nodes in 
the preferred pool are down.
-   * The other metric {@link #POOL_QUERIES} shows the traffic are relatively 
equal over pool.
-   * This metric is still going to show that most of segments are still 
selected from the preferred pool.
+   * Let's say the query option orderedPreferredPools is set and a few nodes 
in the preferred pool are down.
+   * The other metric {@link #POOL_QUERIES} shows the traffic is relatively 
equal over pool.
+   * This metric is still going to show that most of the segments are still 
selected from the preferred pool.
    */
   public static final BrokerMeter POOL_SEG_QUERIES = 
create("POOL_SEG_QUERIES", "routing", false);
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to