This is an automated email from the ASF dual-hosted git repository.
jiaguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 604b244b64 Optimize Adaptive Server Selection (#13952)
604b244b64 is described below
commit 604b244b645dd033b2a1737e70dbca620aad42b0
Author: praveenc7 <[email protected]>
AuthorDate: Wed Sep 11 17:24:40 2024 -0700
Optimize Adaptive Server Selection (#13952)
* Improve time complexity for adaptiveServer
* Remove additional comments
* Remove additional comments
* review comments
* move hashset
* move hashset
* remove hashset
* functional unit test
* add test
* lint
* fix round-robin condition
* loop fix
---
.../ReplicaGroupInstanceSelector.java | 46 ++++++++---------
.../instanceselector/InstanceSelectorTest.java | 60 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 25 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index c766791f2d..374c5235f3 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing.instanceselector;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -75,17 +76,18 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
SegmentStates segmentStates, Map<String, String> queryOptions) {
if (_adaptiveServerSelector != null) {
// Adaptive Server Selection is enabled.
- List<String> serverRankList = new ArrayList<>();
List<String> candidateServers = fetchCandidateServersForQuery(segments,
segmentStates);
// Fetch serverRankList before looping through all the segments. This is
important to make sure that we pick
// the least amount of instances for a query by referring to a single
snapshot of the rankings.
List<Pair<String, Double>> serverRankListWithScores =
_adaptiveServerSelector.fetchServerRankingsWithScores(candidateServers);
- for (Pair<String, Double> entry : serverRankListWithScores) {
- serverRankList.add(entry.getLeft());
+ Map<String, Integer> serverRankMap = new HashMap<>();
+ for (int idx = 0; idx < serverRankListWithScores.size(); idx++) {
+ Pair<String, Double> entry = serverRankListWithScores.get(idx);
+ serverRankMap.put(entry.getLeft(), idx);
}
- return selectServersUsingAdaptiveServerSelector(segments, requestId,
segmentStates, serverRankList);
+ return selectServersUsingAdaptiveServerSelector(segments, requestId,
segmentStates, serverRankMap);
} else {
// Adaptive Server Selection is NOT enabled.
return selectServersUsingRoundRobin(segments, requestId, segmentStates,
queryOptions);
@@ -135,7 +137,7 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
}
private Pair<Map<String, String>, Map<String, String>>
selectServersUsingAdaptiveServerSelector(List<String> segments,
- int requestId, SegmentStates segmentStates, List<String> serverRankList)
{
+ int requestId, SegmentStates segmentStates, Map<String, Integer>
serverRankMap) {
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
// No need to adjust this map per total segment numbers, as optional
segments should be empty most of the time.
Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
@@ -146,26 +148,20 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
if (candidates == null) {
continue;
}
- // Round Robin.
- int numCandidates = candidates.size();
- int instanceIdx = requestId % numCandidates;
- SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
- // Adaptive Server Selection
- // TODO: Support numReplicaGroupsToQuery with Adaptive Server Selection.
- if (!serverRankList.isEmpty()) {
- int minIdx = Integer.MAX_VALUE;
- for (SegmentInstanceCandidate candidate : candidates) {
- int idx = serverRankList.indexOf(candidate.getInstance());
- if (idx == -1) {
- // Let's use the round-robin approach until stats for all servers
are populated.
- selectedInstance = candidates.get(instanceIdx);
- break;
- }
- if (idx < minIdx) {
- minIdx = idx;
- selectedInstance = candidate;
- }
- }
+
+ // Round Robin selection
+ int roundRobinInstanceIdx = requestId % candidates.size();
+ SegmentInstanceCandidate selectedInstance =
candidates.get(roundRobinInstanceIdx);
+
+ // Adaptive Server Selection logic
+ if (!serverRankMap.isEmpty()) {
+ // Use instance with the best rank if all servers have stats
populated, if not use round-robin selected instance
+ selectedInstance = candidates.stream()
+ .anyMatch(candidate ->
!serverRankMap.containsKey(candidate.getInstance()))
+ ? candidates.get(roundRobinInstanceIdx)
+ : candidates.stream()
+ .min(Comparator.comparingInt(candidate ->
serverRankMap.get(candidate.getInstance())))
+ .orElse(candidates.get(roundRobinInstanceIdx));
}
// This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
// broker or server can skip it upon any issue to process it.
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index cc4f355369..daec569fb7 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -36,11 +36,13 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.adaptiveserverselector.HybridSelector;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -1957,4 +1959,62 @@ public class InstanceSelectorTest {
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
+
+ @Test
+ public void testReplicaGroupAdaptiveServerSelector() {
+ // Arrange
+ String offlineTableName = "testTable_OFFLINE";
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ HybridSelector hybridSelector = mock(HybridSelector.class);
+ ReplicaGroupInstanceSelector instanceSelector = new
ReplicaGroupInstanceSelector(
+ offlineTableName, propertyStore, brokerMetrics, hybridSelector,
Clock.systemUTC(), false, 300);
+
+ // Define instances and segments
+ String instance0 = "instance0";
+ String instance1 = "instance1";
+ String instance2 = "instance2";
+ String instance3 = "instance3";
+ String instance4 = "instance4";
+ String segment0 = "segment0";
+ String segment1 = "segment1";
+ String segment2 = "segment2";
+ List<String> segments = Arrays.asList(segment0, segment1, segment2);
+
+ // Define candidates for each segment
+ Map<String, List<SegmentInstanceCandidate>> instanceCandidatesMap = new
HashMap<>();
+ // segment0 -> instance0, instance1
+ instanceCandidatesMap.put(segment0, Arrays.asList(new
SegmentInstanceCandidate(instance0, true),
+ new SegmentInstanceCandidate(instance1, true)));
+ // segment1 -> instance2, instance3
+ instanceCandidatesMap.put(segment1, Arrays.asList(new
SegmentInstanceCandidate(instance2, true),
+ new SegmentInstanceCandidate(instance3, true)));
+ // segment2 -> instance3, instance4 // instance4 is not in the hybrid
selector's server ranking
+ instanceCandidatesMap.put(segment2, Arrays.asList(new
SegmentInstanceCandidate(instance4, true),
+ new SegmentInstanceCandidate(instance3, true)));
+
+ // Define the segment states
+ SegmentStates segmentStates = new SegmentStates(instanceCandidatesMap, new
HashSet<>(segments), null);
+
+ // Define server rankings
+ List<Pair<String, Double>> serverRanks = Arrays.asList(
+ new ImmutablePair<>(instance3, 1.0),
+ new ImmutablePair<>(instance2, 2.0),
+ new ImmutablePair<>(instance1, 3.0),
+ new ImmutablePair<>(instance0, 4.0)
+ );
+
when(hybridSelector.fetchServerRankingsWithScores(any())).thenReturn(serverRanks);
+
+ // Act
+ Pair<Map<String, String>, Map<String, String>> selectedResult =
instanceSelector.select(segments, 0,
+ segmentStates, null);
+
+ // Assert
+ Map<String, String> expectedSelection = new HashMap<>();
+ expectedSelection.put(segment0, instance1);
+ expectedSelection.put(segment1, instance3);
+ expectedSelection.put(segment2, instance4);
+
+ assertEquals(selectedResult.getLeft(), expectedSelection);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]