This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b4e03b1252 Minor cleanups and refactors in instance selector module
(#16744)
4b4e03b1252 is described below
commit 4b4e03b1252c5333c47de63f3567bf6e7e89cc07
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Sep 4 09:12:14 2025 +0530
Minor cleanups and refactors in instance selector module (#16744)
---
.../ServerSelectionContext.java | 12 ++-
.../instanceselector/BalancedInstanceSelector.java | 103 +++++++++------------
.../instanceselector/BaseInstanceSelector.java | 94 ++++++++-----------
.../instanceselector/InstanceSelectorFactory.java | 4 +-
.../MultiStageReplicaGroupSelector.java | 4 +-
.../ReplicaGroupInstanceSelector.java | 92 ++++++------------
.../instanceselector/SegmentInstanceCandidate.java | 6 ++
.../routing/instanceselector/SegmentStates.java | 19 ++--
.../apache/pinot/common/metrics/BrokerMeter.java | 10 +-
9 files changed, 149 insertions(+), 195 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
index 1132b2c4b25..dc17ea93a3e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java
@@ -21,6 +21,7 @@ package
org.apache.pinot.broker.routing.adaptiveserverselector;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.broker.routing.instanceselector.InstanceSelectorConfig;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -29,6 +30,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
* servers are selected for query execution.
*/
public class ServerSelectionContext {
+ private final InstanceSelectorConfig _config;
/**
* Map of query options that can influence server selection behavior.
* These options are passed into the context class to avoid endless
constructor argument changes
@@ -44,6 +46,7 @@ public class ServerSelectionContext {
private final Map<String, String> _queryOptions;
// If some query options need further processing, store the parsing result
below to avoid duplicate parsing.
private final List<Integer> _orderedPreferredPools;
+ private final boolean _isUseFixedReplica;
/**
* Creates a new server selection context with the given query options.
@@ -52,15 +55,22 @@ public class ServerSelectionContext {
*
* @param queryOptions map of query options that may contain server
selection preferences
*/
- public ServerSelectionContext(Map<String, String> queryOptions) {
+ public ServerSelectionContext(Map<String, String> queryOptions,
InstanceSelectorConfig instanceSelectorConfig) {
_queryOptions = queryOptions == null ? Collections.emptyMap() :
queryOptions;
+ _config = instanceSelectorConfig;
_orderedPreferredPools =
QueryOptionsUtils.getOrderedPreferredPools(_queryOptions);
+ Boolean isUseFixedReplica =
QueryOptionsUtils.isUseFixedReplica(_queryOptions);
+ _isUseFixedReplica = isUseFixedReplica != null ? isUseFixedReplica :
instanceSelectorConfig.isUseFixedReplica();
}
public Map<String, String> getQueryOptions() {
return _queryOptions;
}
+ public boolean isUseFixedReplica() {
+ return _isUseFixedReplica;
+ }
+
public List<Integer> getOrderedPreferredPools() {
return _orderedPreferredPools;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 35fe7ef002f..4afd3a26dcd 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -32,21 +32,21 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.HashUtil;
-/**
- * Instance selector to balance the number of segments served by each selected
server instance.
- * <p>If AdaptiveServerSelection is enabled, the request is routed to the best
available server for a segment
- * when it is processed below. This is a best effort approach in distributing
the query to all available servers.
- * If some servers are performing poorly, they might not end up being picked
for any of the segments. For example,
- * there's a query for Segments 1 (Seg1), 2 (Seg2) and Seg3). The servers are
S1, S2, S3. The algorithm works as
- * follows:
- * Step1: Process seg1. Fetch server rankings. Pick the best server.
- * Step2: Process seg2. Fetch server rankings (could have changed or not
since Step 1). Pick the best server.
- * Step3: Process seg3. Fetch server rankings (could have changed or not
since Step 2). Pick the best server.
- *
- * <p>If AdaptiveServerSelection is disabled, the selection algorithm will
always evenly distribute the traffic to all
- * replicas of each segment, and will try to select different replica id for
each segment. The algorithm is very
- * light-weight and will do best effort to balance the number of segments
served by each selected server instance.
- */
+/// Instance selector to balance the number of segments served by each
selected server instance.
+///
+/// If AdaptiveServerSelection is enabled, the request is routed to the best
available server for a segment
+/// when it is processed below. This is a best effort approach in distributing
the query to all available servers.
+/// If some servers are performing poorly, they might not end up being picked
for any of the segments. For example,
+/// there's a query for Segments 1 (Seg1), 2 (Seg2) and 3 (Seg3). The servers
are S1, S2, S3. The algorithm works as
+/// follows:
+/// - Step1: Process seg1. Fetch server rankings. Pick the best server.
+/// - Step2: Process seg2. Fetch server rankings (could have changed or not
since Step 1). Pick the best server.
+/// - Step3: Process seg3. Fetch server rankings (could have changed or not
since Step 2). Pick the best server.
+///
+/// If AdaptiveServerSelection is disabled, the selection algorithm will
always evenly distribute the traffic to all
+/// replicas of each segment, and will try to select different replica id for
each segment. The algorithm is very
+/// light-weight and will do best effort to balance the number of segments
served by each selected server instance.
+///
public class BalancedInstanceSelector extends BaseInstanceSelector {
public BalancedInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
@@ -61,57 +61,40 @@ public class BalancedInstanceSelector extends
BaseInstanceSelector {
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
// No need to adjust this map per total segment numbers, as optional
segments should be empty most of the time.
Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
- ServerSelectionContext ctx = new ServerSelectionContext(queryOptions);
- // TODO: refactor to dedup the code and use a single for loop
+ ServerSelectionContext ctx = new ServerSelectionContext(queryOptions,
_config);
Map<Integer, Integer> poolToSegmentCount = new HashMap<>();
- if (_priorityPoolInstanceSelector != null) {
- for (String segment : segments) {
- List<SegmentInstanceCandidate> candidates =
segmentStates.getCandidates(segment);
- // NOTE: candidates can be null when there is no enabled instances for
the segment, or the instance selector has
- // not been updated (we update all components for routing in sequence)
- if (candidates == null) {
- continue;
- }
- SegmentInstanceCandidate candidate =
_priorityPoolInstanceSelector.select(ctx, candidates);
+
+ for (String segment : segments) {
+ List<SegmentInstanceCandidate> candidates =
segmentStates.getCandidates(segment);
+ // NOTE: candidates can be null when there are no enabled instances for
the segment, or the instance selector has
+ // not been updated (we update all components for routing in sequence)
+ if (candidates == null) {
+ continue;
+ }
+ SegmentInstanceCandidate selectedCandidate;
+
+ if (_priorityPoolInstanceSelector != null) {
+ // Adaptive server selection is enabled
+ selectedCandidate = _priorityPoolInstanceSelector.select(ctx,
candidates);
// If candidates is not null, candidates is always non-empty because
segments with no enabled online servers
// are placed in segmentStates.getUnavailableSegments()
- assert candidate != null;
- poolToSegmentCount.merge(candidate.getPool(), 1, Integer::sum);
- // This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
- // broker or server can skip it upon any issue to process it.
- if (candidate.isOnline()) {
- segmentToSelectedInstanceMap.put(segment, candidate.getInstance());
- } else {
- optionalSegmentToInstanceMap.put(segment, candidate.getInstance());
- }
+ assert selectedCandidate != null;
+ } else if (ctx.isUseFixedReplica()) {
+ // candidates array is always sorted
+ selectedCandidate =
candidates.get(_tableNameHashForFixedReplicaRouting % candidates.size());
+ } else {
+ selectedCandidate = candidates.get(requestId++ % candidates.size());
}
- } else {
- boolean useFixedReplica = isUseFixedReplica(queryOptions);
- for (String segment : segments) {
- List<SegmentInstanceCandidate> candidates =
segmentStates.getCandidates(segment);
- // NOTE: candidates can be null when there is no enabled instances for
the segment, or the instance selector has
- // not been updated (we update all components for routing in sequence)
- if (candidates == null) {
- continue;
- }
- int selectedIdx;
- if (useFixedReplica) {
- // candidates array is always sorted
- selectedIdx = _tableNameHashForFixedReplicaRouting %
candidates.size();
- } else {
- selectedIdx = requestId++ % candidates.size();
- }
- SegmentInstanceCandidate selectedCandidate =
candidates.get(selectedIdx);
- poolToSegmentCount.merge(selectedCandidate.getPool(), 1, Integer::sum);
- // This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
- // broker or server can skip it upon any issue to process it.
- if (selectedCandidate.isOnline()) {
- segmentToSelectedInstanceMap.put(segment,
selectedCandidate.getInstance());
- } else {
- optionalSegmentToInstanceMap.put(segment,
selectedCandidate.getInstance());
- }
+ poolToSegmentCount.merge(selectedCandidate.getPool(), 1, Integer::sum);
+ // This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
+ // broker or server can skip it upon any issue to process it.
+ if (selectedCandidate.isOnline()) {
+ segmentToSelectedInstanceMap.put(segment,
selectedCandidate.getInstance());
+ } else {
+ optionalSegmentToInstanceMap.put(segment,
selectedCandidate.getInstance());
}
}
+
for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) {
_brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES,
entry.getValue(),
BrokerMetrics.getTagForPreferredPool(queryOptions),
String.valueOf(entry.getKey()));
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index c88d3102e62..5acd67f25b5 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -39,7 +39,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import
org.apache.pinot.broker.routing.adaptiveserverselector.PriorityPoolInstanceSelector;
-import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -47,7 +46,6 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.transport.ServerInstance;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -57,33 +55,32 @@ import org.slf4j.LoggerFactory;
import static
org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID;
-/**
- * Base implementation of instance selector. Selector maintains a map from
segment to enabled ONLINE/CONSUMING server
- * instances that serves the segment and a set of unavailable segments (no
enabled instance or all enabled instances are
- * in OFFLINE/ERROR state).
- * <p>
- * Special handling of new segment: It is common for new segment to be
partially available or not available at all in
- * all instances.
- * 1) We don't report new segment as unavailable segments.
- * 2) To avoid creating hotspot instances, unavailable instances for new
segment won't be excluded for instance
- * selection. When it is selected, we don't serve the new segment.
- * <p>
- * Definition of new segment:
- * 1) Segment created more than 5 minutes ago.
- * - If we first see a segment via initialization, we look up segment creation
time from zookeeper.
- * - If we first see a segment via onAssignmentChange initialization, we use
the calling time of onAssignmentChange
- * as approximation.
- * 2) We retire new segment as old when:
- * - The creation time is more than 5 minutes ago
- * - Any instance for new segment is in ERROR state
- * - External view for segment converges with ideal state
- *
- * Note that this implementation means:
- * 1) Inconsistent selection of new segments across queries (some queries will
serve new segments and others won't).
- * 2) When there is no state update from helix, new segments won't be retired
because of the time passing (those with
- * creation time more than 5 minutes ago).
- * TODO: refresh new/old segment state where there is no update from helix for
long time.
- */
+/// Base implementation of instance selector. Selector maintains a map from
segment to enabled ONLINE/CONSUMING server
+/// instances that serves the segment and a set of unavailable segments (no
enabled instance or all enabled instances
+/// are in OFFLINE/ERROR state).
+///
+/// Special handling of new segment: It is common for new segment to be
partially available or not available at all in
+/// all instances.
+/// 1) We don't report new segment as unavailable segments.
+/// 2) To avoid creating hotspot instances, unavailable instances for new
segment won't be excluded for instance
+/// selection. When it is selected, we don't serve the new segment.
+///
+/// Definition of new segment:
+/// 1) Segment created more than 5 minutes ago.
+/// - If we first see a segment via initialization, we look up segment
creation time from zookeeper.
+/// - If we first see a segment via onAssignmentChange initialization, we use
the calling time of onAssignmentChange
+/// as approximation.
+/// 2) We retire new segment as old when:
+/// - The creation time is more than 5 minutes ago
+/// - Any instance for new segment is in ERROR state
+/// - External view for segment converges with ideal state
+///
+/// Note that this implementation means:
+/// 1) Inconsistent selection of new segments across queries (some queries
will serve new segments and others won't).
+/// 2) When there is no state update from helix, new segments won't be retired
because of the time passing (those with
+/// creation time more than 5 minutes ago).
+/// TODO: refresh new/old segment state where there is no update from helix
for long time.
+///
abstract class BaseInstanceSelector implements InstanceSelector {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
// To prevent int overflow, reset the request id once it reaches this value
@@ -96,7 +93,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
// Will be null if and only if adaptiveServerSelector is null
final PriorityPoolInstanceSelector _priorityPoolInstanceSelector;
final Clock _clock;
- final boolean _useFixedReplica;
+ final InstanceSelectorConfig _config;
final long _newSegmentExpirationTimeInSeconds;
final boolean _emitSinglePoolSegmentsMetric;
final int _tableNameHashForFixedReplicaRouting;
@@ -120,7 +117,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
_brokerMetrics = brokerMetrics;
_adaptiveServerSelector = adaptiveServerSelector;
_clock = clock;
- _useFixedReplica = config.isUseFixedReplica();
+ _config = config;
_newSegmentExpirationTimeInSeconds =
config.getNewSegmentExpirationTimeInSeconds();
_emitSinglePoolSegmentsMetric =
config.shouldEmitSinglePoolSegmentsMetrics();
// Using raw table name to ensure queries spanning across REALTIME and
OFFLINE tables are routed to the same
@@ -131,7 +128,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
_priorityPoolInstanceSelector =
_adaptiveServerSelector == null ? null : new
PriorityPoolInstanceSelector(_adaptiveServerSelector);
- if (_adaptiveServerSelector != null && _useFixedReplica) {
+ if (_adaptiveServerSelector != null && config.isUseFixedReplica()) {
throw new IllegalArgumentException(
"AdaptiveServerSelector and consistent routing cannot be enabled at
the same time");
}
@@ -248,13 +245,11 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
}
- /**
- * Updates the segment maps based on the given ideal state, external view,
online segments (segments with
- * ONLINE/CONSUMING instances in the ideal state and pre-selected by the
{@link SegmentPreSelector}) and new segments.
- * After this update:
- * - Old segments' online instances should be tracked in
_oldSegmentCandidatesMap
- * - New segments' state (creation time and candidate instances) should be
tracked in _newSegmentStateMap
- */
+ /// Updates the segment maps based on the given ideal state, external view,
online segments (segments with
+ /// ONLINE/CONSUMING instances in the ideal state and pre-selected by the
{@code SegmentPreSelector}) and new
+ /// segments. After this update:
+ /// - Old segments' online instances should be tracked in
_oldSegmentCandidatesMap
+ /// - New segments' state (creation time and candidate instances) should be
tracked in _newSegmentStateMap
void updateSegmentMaps(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments,
Map<String, Long> newSegmentCreationTimeMap) {
_oldSegmentCandidatesMap.clear();
@@ -262,7 +257,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
Map<String, Map<String, String>> externalViewAssignment =
externalView.getRecord().getMapFields();
- int count = 0;
+ int numSinglePoolSegments = 0;
Set<Integer> pools = new HashSet<>();
for (String segment : onlineSegments) {
Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
@@ -311,12 +306,12 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
pools.add(getPool(instance));
}
if (pools.size() < 2) {
- count++;
+ numSinglePoolSegments++;
}
}
}
if (_emitSinglePoolSegmentsMetric) {
- _brokerMetrics.addMeteredTableValue(_tableNameWithType,
BrokerMeter.SINGLE_POOL_SEGMENTS, count);
+ _brokerMetrics.addMeteredTableValue(_tableNameWithType,
BrokerMeter.SINGLE_POOL_SEGMENTS, numSinglePoolSegments);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap:
{}", _newSegmentStateMap.keySet(),
@@ -393,8 +388,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
/**
* {@inheritDoc}
*
- * <p>Updates the cached enabled instances and re-calculates {@code
segmentToEnabledInstancesMap} and
- * {@code unavailableSegments} based on the cached states.
+ * <p>Updates the cached enabled instances and re-calculates {@link
#_segmentStates}.
*/
@Override
public void onInstancesChange(Set<String> enabledInstances, List<String>
changedInstances) {
@@ -405,9 +399,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
/**
* {@inheritDoc}
*
- * <p>Updates the cached maps ({@code segmentToOnlineInstancesMap}, {@code
segmentToOfflineInstancesMap} and
- * {@code instanceToSegmentsMap}) and re-calculates {@code
segmentToEnabledInstancesMap} and
- * {@code unavailableSegments} based on the cached states.
+ * <p>Updates the cached maps ({@link #_oldSegmentCandidatesMap} and {@link
#_newSegmentStateMap}, and re-calculates
+ * {@link #_segmentStates} based on the cached states.
*/
@Override
public void onAssignmentChange(IdealState idealState, ExternalView
externalView, Set<String> onlineSegments) {
@@ -477,11 +470,6 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
}
- protected boolean isUseFixedReplica(Map<String, String> queryOptions) {
- Boolean queryOption = QueryOptionsUtils.isUseFixedReplica(queryOptions);
- return queryOption != null ? queryOption : _useFixedReplica;
- }
-
@Override
public Set<String> getServingInstances() {
return _segmentStates.getServingInstances();
@@ -503,7 +491,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
/**
* Selects the server instances for the given segments based on the request
id and segment states. Returns two maps
* from segment to selected server instance hosting the segment. The 2nd map
is for optional segments. The optional
- * segments are used to get the new segments that is not online yet. Instead
of simply skipping them by broker at
+ * segments are used to get the new segments that are not online yet.
Instead of simply skipping them by broker at
* routing time, we can send them to servers and let servers decide how to
handle them.
*/
abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/>
select(List<String> segments,
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 25f57cbbffd..f49de38ac57 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -69,11 +69,11 @@ public class InstanceSelectorFactory {
long newSegmentExpirationTimeInSeconds =
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS,
CommonConstants.Broker.DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS);
- boolean emitSinglePoolSegments = brokerConfig.getProperty(
+ boolean emitSinglePoolSegmentsMetric = brokerConfig.getProperty(
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_SINGLE_POOL_SEGMENTS_METRIC,
CommonConstants.Broker.DEFAULT_ENABLE_SINGLE_POOL_SEGMENTS_METRIC);
InstanceSelectorConfig config = new
InstanceSelectorConfig(useFixedReplica, newSegmentExpirationTimeInSeconds,
- emitSinglePoolSegments);
+ emitSinglePoolSegmentsMetric);
if (routingConfig != null) {
if
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
|| (tableConfig.getTableType() == TableType.OFFLINE &&
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index b42ee431429..40e46adbc78 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -34,6 +34,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import
org.apache.pinot.broker.routing.adaptiveserverselector.ServerSelectionContext;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -90,7 +91,8 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
// Create a copy of InstancePartitions to avoid race-condition with
event-listeners above.
InstancePartitions instancePartitions = _instancePartitions;
int replicaGroupSelected;
- if (isUseFixedReplica(queryOptions)) {
+ ServerSelectionContext ctx = new ServerSelectionContext(queryOptions,
_config);
+ if (ctx.isUseFixedReplica()) {
// When using sticky routing, we want to iterate over the
instancePartitions in order to ensure deterministic
// selection of replica group across queries i.e. same instance replica
group id is picked each time.
// Since the instances within a selected replica group are iterated in
order, the assignment within a selected
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index f087e98d821..88793eac7cb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -46,8 +47,8 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
* that have the same segments assigned. For an example, if S1 is a server in
replica-group 1, and it has mirror server
* S2 in replica-group 2 and S3 in replica-group 3. All segments assigned to
S1 will also be assigned to S2 and S3. In
* stable scenario (external view matches ideal state), all segments assigned
to S1 will have the same enabled instances
- * of [S1, S2, S3] sorted (in alphabetical order). If we always pick the same
index of enabled instances for all
- * segments, only one of S1, S2, S3 will be picked, so it is guaranteed that
we pick the least server instances for the
+ * of [S1, S2, S3] sorted (in alphabetical order). If we pick the same index
of enabled instances for all segments for a
+ * request, only one of S1, S2, S3 will be picked, so it is guaranteed that we
pick the least server instances for the
* request (there is no guarantee on choosing servers from the same
replica-group though). In transitioning/error
* scenario (external view does not match ideal state), there is no guarantee
on picking the least server instances, but
* the traffic is guaranteed to be evenly distributed to all available
instances to avoid overwhelming hotspot servers.
@@ -60,7 +61,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
* <p>If AdaptiveServerSelection is enabled, a single snapshot of the server
ranking is fetched. This ranking is
* referenced to pick the best available server for each segment. The
algorithm ends up picking the minimum number of
* servers required to process a query because it references a single snapshot
of the server rankings. Currently,
- * NUM_REPLICA_GROUPS_TO_QUERY is not supported is AdaptiveServerSelection is
enabled.
+ * NUM_REPLICA_GROUPS_TO_QUERY is not supported if AdaptiveServerSelection is
enabled.
*/
public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
@@ -73,7 +74,7 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
@Override
Pair<Map<String, String>, Map<String, String>> select(List<String> segments,
int requestId,
SegmentStates segmentStates, Map<String, String> queryOptions) {
- ServerSelectionContext ctx = new ServerSelectionContext(queryOptions);
+ ServerSelectionContext ctx = new ServerSelectionContext(queryOptions,
_config);
if (_adaptiveServerSelector != null) {
// Adaptive Server Selection is enabled.
List<SegmentInstanceCandidate> candidateServers =
fetchCandidateServersForQuery(segments, segmentStates);
@@ -85,42 +86,52 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
for (int idx = 0; idx < serverRankList.size(); idx++) {
serverRankMap.put(serverRankList.get(idx), idx);
}
- return selectServersUsingAdaptiveServerSelector(segments, requestId,
segmentStates, serverRankMap, ctx);
+ return selectServers(segments, requestId, segmentStates, serverRankMap,
ctx);
} else {
// Adaptive Server Selection is NOT enabled.
- return selectServersUsingRoundRobin(segments, requestId, segmentStates,
ctx);
+ return selectServers(segments, requestId, segmentStates, null, ctx);
}
}
- private Pair<Map<String, String>, Map<String, String>>
selectServersUsingRoundRobin(List<String> segments,
- int requestId, SegmentStates segmentStates, ServerSelectionContext ctx) {
+ private Pair<Map<String, String>, Map<String, String>>
selectServers(List<String> segments, int requestId,
+ SegmentStates segmentStates, @Nullable Map<String, Integer>
serverRankMap, ServerSelectionContext ctx) {
+
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
// No need to adjust this map per total segment numbers, as optional
segments should be empty most of the time.
Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
Map<Integer, Integer> poolToSegmentCount = new HashMap<>();
- boolean useFixedReplica = isUseFixedReplica(ctx.getQueryOptions());
+ boolean useFixedReplica = ctx.isUseFixedReplica();
Integer numReplicaGroupsToQuery =
QueryOptionsUtils.getNumReplicaGroupsToQuery(ctx.getQueryOptions());
int numReplicaGroups = numReplicaGroupsToQuery != null ?
numReplicaGroupsToQuery : 1;
int replicaOffset = 0;
for (String segment : segments) {
List<SegmentInstanceCandidate> candidates =
segmentStates.getCandidates(segment);
- // NOTE: candidates can be null when there is no enabled instances for
the segment, or the instance selector has
+ // NOTE: candidates can be null when there are no enabled instances for
the segment, or the instance selector has
// not been updated (we update all components for routing in sequence)
if (candidates == null) {
continue;
}
- // Round robin selection.
- int numCandidates = candidates.size();
- int instanceIdx;
+ // Round-robin selection (default behavior)
+ int numCandidates = candidates.size();
+ int instanceIdx = (requestId + replicaOffset) % numCandidates;
+ SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
if (useFixedReplica) {
- // candidates array is always sorted
- instanceIdx = (_tableNameHashForFixedReplicaRouting + replicaOffset) %
numCandidates;
- } else {
- instanceIdx = (requestId + replicaOffset) % numCandidates;
+ // Adaptive Server Selection cannot be used with fixed replica routing.
+ // The candidates array is always sorted
+ selectedInstance =
candidates.get((_tableNameHashForFixedReplicaRouting + replicaOffset) %
numCandidates);
+ } else if (MapUtils.isNotEmpty(serverRankMap)) {
+ // Adaptive Server Selection is enabled.
+ // Use the instance with the best rank if all servers have stats
populated, else use the round-robin selected
+ // instance
+ selectedInstance = candidates.stream()
+ .anyMatch(candidate ->
!serverRankMap.containsKey(candidate.getInstance()))
+ ? selectedInstance
+ : candidates.stream()
+ .min(Comparator.comparingInt(candidate ->
serverRankMap.get(candidate.getInstance())))
+ .orElse(selectedInstance);
}
- SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
poolToSegmentCount.merge(selectedInstance.getPool(), 1, Integer::sum);
// This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
// broker or server can skip it upon any issue to process it.
@@ -141,51 +152,6 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
}
- private Pair<Map<String, String>, Map<String, String>>
selectServersUsingAdaptiveServerSelector(List<String> segments,
- int requestId, SegmentStates segmentStates, Map<String, Integer>
serverRankMap,
- ServerSelectionContext ctx) {
- Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
- // No need to adjust this map per total segment numbers, as optional
segments should be empty most of the time.
- Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
- Map<Integer, Integer> poolToSegmentCount = new HashMap<>();
- for (String segment : segments) {
- // NOTE: candidates can be null when there is no enabled instances for
the segment, or the instance selector has
- // not been updated (we update all components for routing in sequence)
- List<SegmentInstanceCandidate> candidates =
segmentStates.getCandidates(segment);
- if (candidates == null) {
- continue;
- }
-
- // Round Robin selection
- int roundRobinInstanceIdx = requestId % candidates.size();
- SegmentInstanceCandidate selectedInstance =
candidates.get(roundRobinInstanceIdx);
-
- // Adaptive Server Selection logic
- if (!serverRankMap.isEmpty()) {
- // Use instance with the best rank if all servers have stats
populated, if not use round-robin selected instance
- selectedInstance = candidates.stream()
- .anyMatch(candidate ->
!serverRankMap.containsKey(candidate.getInstance()))
- ? candidates.get(roundRobinInstanceIdx)
- : candidates.stream()
- .min(Comparator.comparingInt(candidate ->
serverRankMap.get(candidate.getInstance())))
- .orElse(candidates.get(roundRobinInstanceIdx));
- }
- poolToSegmentCount.merge(selectedInstance.getPool(), 1, Integer::sum);
- // This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
- // broker or server can skip it upon any issue to process it.
- if (selectedInstance.isOnline()) {
- segmentToSelectedInstanceMap.put(segment,
selectedInstance.getInstance());
- } else {
- optionalSegmentToInstanceMap.put(segment,
selectedInstance.getInstance());
- }
- }
- for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) {
- _brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES,
entry.getValue(),
- BrokerMetrics.getTagForPreferredPool(ctx.getQueryOptions()),
String.valueOf(entry.getKey()));
- }
- return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
- }
-
private List<SegmentInstanceCandidate>
fetchCandidateServersForQuery(List<String> segments,
SegmentStates segmentStates) {
Map<String, SegmentInstanceCandidate> candidateServers = new HashMap<>();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
index 6dcb88eb54c..7b251eb4397 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java
@@ -58,4 +58,10 @@ public class SegmentInstanceCandidate {
public int getPool() {
return _pool;
}
+
+ @Override
+ public String toString() {
+ return "SegmentInstanceCandidate{" + "_instance='" + _instance + '\'' + ",
_online=" + _online + ", _pool=" + _pool
+ + '}';
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
index c8809549b71..bfb56dd7bff 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStates.java
@@ -25,16 +25,15 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
-/**
- * The {@code SegmentStates} contains the candidate instances for each
segment, and the unavailable segments for routing
- * purpose.
- *
- * For old segments, the instance candidates should always have online flag
set to true.
- * For old segments without any enabled instance candidates, we report them as
unavailable segments.
- *
- * For new segments, the online flag within the instance candidates indicates
whether the instance is online or not.
- * We don't report new segments as unavailable segments because it is valid
for new segments to be offline.
- */
+/// The {@code SegmentStates} contains the candidate instances for each
segment, and the unavailable segments for
+/// routing purpose.
+///
+/// For old segments, the instance candidates should always have online flag
set to true.
+/// For old segments without any enabled instance candidates, we report them
as unavailable segments.
+///
+/// For new segments, the online flag within the instance candidates indicates
whether the instance is online or not.
+/// We don't report new segments as unavailable segments because it is valid
for new segments to be offline.
+///
@Immutable
public class SegmentStates {
private final Map<String, List<SegmentInstanceCandidate>>
_instanceCandidatesMap;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 527ee56dc6e..bbe08624457 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -58,14 +58,14 @@ public class BrokerMeter implements AbstractMetrics.Meter {
*/
public static final BrokerMeter POOL_QUERIES = create("POOL_QUERIES",
"routing", false);
/**
- * Number of segment selected per pool during query execution.
+ * Number of segments selected per pool during query execution.
* <p>
* This metric is not global and is attached to a particular pool.
- * Currently this counter include single stage queries only.
+ * Currently, this counter includes single-stage queries only.
* <p>
- * Let's say the query option orderedReferredPools is set and a few nodes in
the preferred pool are down.
- * The other metric {@link #POOL_QUERIES} shows the traffic are relatively
equal over pool.
- * This metric is still going to show that most of segments are still
selected from the preferred pool.
+ * Let's say the query option orderedPreferredPools is set and a few nodes
in the preferred pool are down.
+ * The other metric {@link #POOL_QUERIES} shows the traffic is relatively
equal over pool.
+ * This metric is still going to show that most of the segments are still
selected from the preferred pool.
*/
public static final BrokerMeter POOL_SEG_QUERIES =
create("POOL_SEG_QUERIES", "routing", false);
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]