This is an automated email from the ASF dual-hosted git repository.
yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e41bdd0c6f [multistage] Add Multi Stage Strict Replica Group Routing
Strategy (#9808)
e41bdd0c6f is described below
commit e41bdd0c6ff851ca65102fb7bdc83161753e839c
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Nov 29 22:25:23 2022 +0530
[multistage] Add Multi Stage Strict Replica Group Routing Strategy (#9808)
* Rebase on master
* Cleanup
* Add tests
* Fix test
* Address feedback
* Mark feature is in Beta
* Address feedback
* Address feedback
* Rename to MultiStageReplicaGroup
---
.../broker/api/resources/PinotBrokerDebug.java | 15 +-
.../requesthandler/BaseBrokerRequestHandler.java | 4 +-
.../MultiStageBrokerRequestHandler.java | 2 +-
.../pinot/broker/routing/BrokerRoutingManager.java | 13 +-
.../instanceselector/BaseInstanceSelector.java | 10 +-
.../routing/instanceselector/InstanceSelector.java | 7 +-
.../instanceselector/InstanceSelectorFactory.java | 11 +-
.../MultiStageReplicaGroupSelector.java | 150 +++++++++++
.../broker/broker/HelixBrokerStarterTest.java | 7 +-
.../BaseBrokerRequestHandlerTest.java | 2 +-
.../instanceselector/InstanceSelectorTest.java | 277 +++++++++++++++------
.../apache/pinot/core/routing/RoutingManager.java | 2 +-
.../org/apache/pinot/query/QueryEnvironment.java | 16 +-
.../pinot/query/planner/logical/StagePlanner.java | 6 +-
.../apache/pinot/query/routing/WorkerManager.java | 16 +-
.../query/testutils/MockRoutingManagerFactory.java | 2 +-
.../pinot/spi/config/table/RoutingConfig.java | 1 +
17 files changed, 426 insertions(+), 115 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index daa17a0e5c..f9eb55b535 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.SwaggerDefinition;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -59,6 +60,9 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
// TODO: Add APIs to return the RoutingTable (with unavailable segments)
public class PinotBrokerDebug {
+ // Request ID is passed to the RoutingManager to rotate the selected
replica-group.
+ private final AtomicLong _requestIdGenerator = new AtomicLong();
+
@Inject
private BrokerRoutingManager _routingManager;
@@ -102,7 +106,7 @@ public class PinotBrokerDebug {
if (tableType != TableType.REALTIME) {
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
RoutingTable routingTable = _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
offlineTableName));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
offlineTableName), getRequestId());
if (routingTable != null) {
result.put(offlineTableName,
routingTable.getServerInstanceToSegmentsMap());
}
@@ -110,7 +114,7 @@ public class PinotBrokerDebug {
if (tableType != TableType.OFFLINE) {
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
RoutingTable routingTable = _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
realtimeTableName));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
realtimeTableName), getRequestId());
if (routingTable != null) {
result.put(realtimeTableName,
routingTable.getServerInstanceToSegmentsMap());
}
@@ -133,7 +137,8 @@ public class PinotBrokerDebug {
})
public Map<ServerInstance, List<String>> getRoutingTableForQuery(
@ApiParam(value = "SQL query (table name should have type suffix)")
@QueryParam("query") String query) {
- RoutingTable routingTable =
_routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query));
+ RoutingTable routingTable =
_routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query),
+ getRequestId());
if (routingTable != null) {
return routingTable.getServerInstanceToSegmentsMap();
} else {
@@ -157,4 +162,8 @@ public class PinotBrokerDebug {
public String getServerRoutingStats() {
return _serverRoutingStatsManager.getServerRoutingStatsStr();
}
+
+ private long getRequestId() {
+ return _requestIdGenerator.getAndIncrement();
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index ade86b61bd..ae88689a51 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -551,7 +551,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
int numPrunedSegmentsTotal = 0;
if (offlineBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
- RoutingTable routingTable =
_routingManager.getRoutingTable(offlineBrokerRequest);
+ RoutingTable routingTable =
_routingManager.getRoutingTable(offlineBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
@@ -567,7 +567,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
if (realtimeBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
- RoutingTable routingTable =
_routingManager.getRoutingTable(realtimeBrokerRequest);
+ RoutingTable routingTable =
_routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 0b1a67efbb..75e8d8d3f0 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -144,7 +144,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return constructMultistageExplainPlan(query, plan);
case SELECT:
default:
- queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions);
+ queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions,
requestId);
break;
}
} catch (Exception e) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index ee0982606d..883e9cfb02 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
* <li>{@link #removeRouting(String)}: Removes the routing for a table</li>
* <li>{@link #refreshSegment(String, String)}: Refreshes the metadata for a
segment</li>
* <li>{@link #routingExists(String)}: Returns whether the routing exists
for a table</li>
- * <li>{@link #getRoutingTable(BrokerRequest)}: Returns the routing table
for a query</li>
+ * <li>{@link #getRoutingTable(BrokerRequest, long)}: Returns the routing
table for a query</li>
* <li>{@link #getTimeBoundaryInfo(String)}: Returns the time boundary info
for a table</li>
* <li>{@link #getQueryTimeoutMs(String)}: Returns the table-level query
timeout in milliseconds for a table</li>
* </ul>
@@ -437,7 +437,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
AdaptiveServerSelector adaptiveServerSelector =
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
InstanceSelector instanceSelector =
- InstanceSelectorFactory.getInstanceSelector(tableConfig,
_brokerMetrics, adaptiveServerSelector);
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
+ adaptiveServerSelector);
instanceSelector.init(_routableServers, idealState, externalView,
preSelectedOnlineSegments);
// Add time boundary manager if both offline and real-time part exist for
a hybrid table
@@ -567,13 +568,13 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
*/
@Nullable
@Override
- public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long
requestId) {
String tableNameWithType = brokerRequest.getQuerySource().getTableName();
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
if (routingEntry == null) {
return null;
}
- InstanceSelector.SelectionResult selectionResult =
routingEntry.calculateRouting(brokerRequest);
+ InstanceSelector.SelectionResult selectionResult =
routingEntry.calculateRouting(brokerRequest, requestId);
Map<String, String> segmentToInstanceMap =
selectionResult.getSegmentToInstanceMap();
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new
HashMap<>();
for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) {
@@ -717,7 +718,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
}
- InstanceSelector.SelectionResult calculateRouting(BrokerRequest
brokerRequest) {
+ InstanceSelector.SelectionResult calculateRouting(BrokerRequest
brokerRequest, long requestId) {
Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
int numTotalSelectedSegments = selectedSegments.size();
if (!selectedSegments.isEmpty()) {
@@ -728,7 +729,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
int numPrunedSegments = numTotalSelectedSegments -
selectedSegments.size();
if (!selectedSegments.isEmpty()) {
InstanceSelector.SelectionResult selectionResult =
_instanceSelector.select(brokerRequest,
- new ArrayList<>(selectedSegments));
+ new ArrayList<>(selectedSegments), requestId);
selectionResult.setNumPrunedSegments(numPrunedSegments);
return selectionResult;
} else {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 9b92d6031c..9325035aca 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -50,9 +49,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
// To prevent int overflow, reset the request id once it reaches this value
- private static final int MAX_REQUEST_ID = 1_000_000_000;
+ private static final long MAX_REQUEST_ID = 1_000_000_000;
- private final AtomicLong _requestId = new AtomicLong();
private final String _tableNameWithType;
private final BrokerMetrics _brokerMetrics;
protected final AdaptiveServerSelector _adaptiveServerSelector;
@@ -267,13 +265,13 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
@Override
- public SelectionResult select(BrokerRequest brokerRequest, List<String>
segments) {
- int requestId = (int) (_requestId.getAndIncrement() % MAX_REQUEST_ID);
+ public SelectionResult select(BrokerRequest brokerRequest, List<String>
segments, long requestId) {
Map<String, String> queryOptions = (brokerRequest.getPinotQuery() != null
&& brokerRequest.getPinotQuery().getQueryOptions() != null)
? brokerRequest.getPinotQuery().getQueryOptions()
: Collections.emptyMap();
- Map<String, String> segmentToInstanceMap = select(segments, requestId,
_segmentToEnabledInstancesMap,
+ int requestIdInt = (int) (requestId % MAX_REQUEST_ID);
+ Map<String, String> segmentToInstanceMap = select(segments, requestIdInt,
_segmentToEnabledInstancesMap,
queryOptions);
Set<String> unavailableSegments = _unavailableSegments;
if (unavailableSegments.isEmpty()) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 4edaf69b0a..4c96007fd6 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -55,8 +55,13 @@ public interface InstanceSelector {
* Selects the server instances for the given segments queried by the given
broker request, returns a map from segment
* to selected server instance hosting the segment and a set of unavailable
segments (no enabled instance or all
* enabled instances are in ERROR state).
+ *
+ * @param brokerRequest BrokerRequest for the query
+ * @param segments segments for which instance needs to be selected
+ * @param requestId requestId generated by the Broker for a query
+ * @return instance of SelectionResult which describes the instance to pick
for a given segment
*/
- SelectionResult select(BrokerRequest brokerRequest, List<String> segments);
+ SelectionResult select(BrokerRequest brokerRequest, List<String> segments,
long requestId);
class SelectionResult {
private final Map<String, String> _segmentToInstanceMap;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 0ef46d7b7a..8cc9f260f8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -19,6 +19,8 @@
package org.apache.pinot.broker.routing.instanceselector;
import javax.annotation.Nullable;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -37,7 +39,8 @@ public class InstanceSelectorFactory {
public static final String LEGACY_REPLICA_GROUP_OFFLINE_ROUTING =
"PartitionAwareOffline";
public static final String LEGACY_REPLICA_GROUP_REALTIME_ROUTING =
"PartitionAwareRealtime";
- public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
BrokerMetrics brokerMetrics,
+ public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics,
@Nullable AdaptiveServerSelector adaptiveServerSelector) {
String tableNameWithType = tableConfig.getTableName();
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
@@ -55,6 +58,12 @@ public class InstanceSelectorFactory {
LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
return new StrictReplicaGroupInstanceSelector(tableNameWithType,
brokerMetrics, adaptiveServerSelector);
}
+ if
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
+ routingConfig.getInstanceSelectorType())) {
+ LOGGER.info("Using {} for table: {}",
routingConfig.getInstanceSelectorType(), tableNameWithType);
+ return new MultiStageReplicaGroupSelector(tableNameWithType,
propertyStore, brokerMetrics,
+ adaptiveServerSelector);
+ }
}
return new BalancedInstanceSelector(tableNameWithType, brokerMetrics,
adaptiveServerSelector);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
new file mode 100644
index 0000000000..0a6d66510c
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Instance selector for multi-stage queries which can ensure that Colocated
Tables always leverage Colocated Join
+ * whenever possible. To achieve this, this instance-selector uses
InstancePartitions (IP) to determine replica-groups,
+ * as opposed to IdealState used by other instance-selectors. Moreover, this
also uses the requestId generated by
+ * Pinot broker to determine the replica-group picked for each table involved
in the query, as opposed to using a
+ * member variable. There may be scenarios where an instance in the chosen
replica-group is down. In that case, this
+ * strategy will try to pick another replica-group. For realtime tables, this
strategy uses only CONSUMING partitions.
+ * This is feature is in <strong>Beta</strong>.
+ */
+public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MultiStageReplicaGroupSelector.class);
+
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private InstancePartitions _instancePartitions;
+
+ public MultiStageReplicaGroupSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector) {
+ super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
+ _tableNameWithType = tableNameWithType;
+ _propertyStore = propertyStore;
+ }
+
+ @Override
+ public void init(Set<String> enabledInstances, IdealState idealState,
ExternalView externalView,
+ Set<String> onlineSegments) {
+ super.init(enabledInstances, idealState, externalView, onlineSegments);
+ _instancePartitions = getInstancePartitions();
+ }
+
+ @Override
+ public void onInstancesChange(Set<String> enabledInstances, List<String>
changedInstances) {
+ super.onInstancesChange(enabledInstances, changedInstances);
+ _instancePartitions = getInstancePartitions();
+ }
+
+ @Override
+ public void onAssignmentChange(IdealState idealState, ExternalView
externalView, Set<String> onlineSegments) {
+ super.onAssignmentChange(idealState, externalView, onlineSegments);
+ _instancePartitions = getInstancePartitions();
+ }
+
+ @Override
+ Map<String, String> select(List<String> segments, int requestId,
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String,
String> queryOptions) {
+ // Create a copy of InstancePartitions to avoid race-condition with
event-listeners above.
+ InstancePartitions instancePartitions = _instancePartitions;
+ int replicaGroupSelected = requestId %
instancePartitions.getNumReplicaGroups();
+ for (int iteration = 0; iteration <
instancePartitions.getNumReplicaGroups(); iteration++) {
+ int replicaGroup = (replicaGroupSelected + iteration) %
instancePartitions.getNumReplicaGroups();
+ try {
+ return tryAssigning(segmentToEnabledInstancesMap, instancePartitions,
replicaGroup);
+ } catch (Exception e) {
+ LOGGER.warn("Unable to select replica-group {} for table: {}",
replicaGroup, _tableNameWithType, e);
+ }
+ }
+ throw new RuntimeException(String.format("Unable to find any replica-group
to serve table: %s",
+ _tableNameWithType));
+ }
+
+ /**
+ * Returns a map from the segmentName to the corresponding server in the
given replica-group. If the is not enabled,
+ * we throw an exception.
+ */
+ private Map<String, String> tryAssigning(Map<String, List<String>>
segmentToEnabledInstancesMap,
+ InstancePartitions instancePartitions, int replicaId) {
+ Set<String> instanceLookUpSet = new HashSet<>();
+ for (int partition = 0; partition < instancePartitions.getNumPartitions();
partition++) {
+ List<String> instances = instancePartitions.getInstances(partition,
replicaId);
+ instanceLookUpSet.addAll(instances);
+ }
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry :
segmentToEnabledInstancesMap.entrySet()) {
+ String segmentName = entry.getKey();
+ boolean found = false;
+ for (String enabledInstanceForSegment : entry.getValue()) {
+ if (instanceLookUpSet.contains(enabledInstanceForSegment)) {
+ found = true;
+ result.put(segmentName, enabledInstanceForSegment);
+ break;
+ }
+ }
+ if (!found) {
+ throw new RuntimeException(String.format("Unable to find an enabled
instance for segment: %s", segmentName));
+ }
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ protected InstancePartitions getInstancePartitions() {
+ // TODO: Evaluate whether we need to provide support for COMPLETE
partitions.
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(_tableNameWithType);
+ Preconditions.checkNotNull(tableType);
+ InstancePartitions instancePartitions = null;
+ if (tableType.equals(TableType.OFFLINE)) {
+ instancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+
InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType,
tableType.name()));
+ } else {
+ instancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+ InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType,
+ InstancePartitionsType.CONSUMING.name()));
+ }
+ Preconditions.checkNotNull(instancePartitions);
+ return instancePartitions;
+ }
+}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b2abbd59a9..53e75d0557 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -153,7 +153,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
assertTrue(routingManager.routingExists(REALTIME_TABLE_NAME));
BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
OFFLINE_TABLE_NAME);
- RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest);
+ RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest,
0);
assertNotNull(routingTable);
assertEquals(routingTable.getServerInstanceToSegmentsMap().size(),
NUM_SERVERS);
assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(),
NUM_OFFLINE_SEGMENTS);
@@ -164,8 +164,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
"downloadUrl");
TestUtils.waitForCondition(aVoid ->
-
routingManager.getRoutingTable(brokerRequest).getServerInstanceToSegmentsMap().values().iterator().next().size()
- == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new
segment into the routing table");
+ routingManager.getRoutingTable(brokerRequest,
0).getServerInstanceToSegmentsMap()
+ .values().iterator().next().size() == NUM_OFFLINE_SEGMENTS + 1,
30_000L, "Failed to add the new segment "
+ + "into the routing table");
// Add a new table with different broker tenant
String newRawTableName = "newTable";
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index cff4eff193..88f72600e5 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -199,7 +199,7 @@ public class BaseBrokerRequestHandlerTest {
RoutingTable rt = mock(RoutingTable.class);
when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
Collections.singletonList("segment01")));
- when(routingManager.getRoutingTable(any())).thenReturn(rt);
+ when(routingManager.getRoutingTable(any(),
Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 5025782a49..07efbe101a 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -28,7 +31,10 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
@@ -41,10 +47,13 @@ import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.Segmen
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class InstanceSelectorTest {
@@ -55,26 +64,27 @@ public class InstanceSelectorTest {
public void testInstanceSelectorFactory() {
TableConfig tableConfig = mock(TableConfig.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
AdaptiveServerSelector adaptiveServerSelector = null;
// Routing config is missing
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof BalancedInstanceSelector);
// Instance selector type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof BalancedInstanceSelector);
// Replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
// Strict replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof StrictReplicaGroupInstanceSelector);
// Should be backward-compatible with legacy config
@@ -82,12 +92,12 @@ public class InstanceSelectorTest {
when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
}
@@ -169,6 +179,8 @@ public class InstanceSelectorTest {
replicaGroupInstanceSelector.init(enabledInstances, idealState,
externalView, onlineSegments);
strictReplicaGroupInstanceSelector.init(enabledInstances, idealState,
externalView, onlineSegments);
+ int requestId = 0;
+
// For the 1st request:
// BalancedInstanceSelector:
// segment0 -> instance0
@@ -189,7 +201,8 @@ public class InstanceSelectorTest {
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
- InstanceSelector.SelectionResult selectionResult =
balancedInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult =
balancedInstanceSelector.select(brokerRequest, segments,
+ requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
Map<String, String> expectedReplicaGroupInstanceSelectorResult = new
HashMap<>();
@@ -197,10 +210,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance0);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -215,12 +228,13 @@ public class InstanceSelectorTest {
// segment1 -> instance2
// segment2 -> instance3
// segment3 -> instance3
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment0, instance2);
expectedBalancedInstanceSelectorResult.put(segment1, instance0);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -228,10 +242,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -252,12 +266,13 @@ public class InstanceSelectorTest {
// segment1 -> instance2
// segment2 -> instance1
// segment3 -> instance1
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment0, instance2);
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -265,10 +280,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -283,12 +298,13 @@ public class InstanceSelectorTest {
// segment1 -> instance2
// segment2 -> instance3
// segment3 -> instance3
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment0, instance2);
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -296,10 +312,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -326,21 +342,22 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> null
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -355,21 +372,22 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> null
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -389,12 +407,13 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -402,10 +421,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -420,12 +439,13 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -433,10 +453,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -457,12 +477,13 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> instance0
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance0);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -470,10 +491,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -488,12 +509,13 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
expectedBalancedInstanceSelectorResult.put(segment4, instance0);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -501,10 +523,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -534,12 +556,13 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -547,7 +570,7 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
Map<String, String> expectedStrictReplicaGroupInstanceSelectorResult = new
HashMap<>();
@@ -555,7 +578,7 @@ public class InstanceSelectorTest {
expectedStrictReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedStrictReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedStrictReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedStrictReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -570,12 +593,13 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
expectedBalancedInstanceSelectorResult.put(segment4, instance0);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -583,10 +607,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
@@ -668,12 +692,11 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segments.get(9), instance1);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(10),
instance0);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(11),
instance1);
- InstanceSelector.SelectionResult selectionResult =
replicaGroupInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult =
replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
-
@Test
public void
testReplicaGroupInstanceSelectorNumReplicaGroupsToQueryGreaterThanReplicas() {
String offlineTableName = "testTable_OFFLINE";
@@ -752,7 +775,7 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segments.get(9), instance0);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(10),
instance1);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(11),
instance2);
- InstanceSelector.SelectionResult selectionResult =
replicaGroupInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult =
replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
@@ -814,14 +837,126 @@ public class InstanceSelectorTest {
for (String segment: segments) {
expectedReplicaGroupInstanceSelectorResult.put(segment, instance0);
}
- InstanceSelector.SelectionResult selectionResult =
replicaGroupInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult =
replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
for (String segment: segments) {
expectedReplicaGroupInstanceSelectorResult.put(segment, instance1);
}
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest,
segments, 1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
+ }
+
+ @Test
+ public void testMultiStageStrictReplicaGroupSelector() {
+ String offlineTableName = "testTable_OFFLINE";
+ // Create instance-partitions with two replica-groups and 1 partition.
Each replica-group has 2 instances.
+ List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1");
+ List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3");
+ Map<String, List<String>> partitionToInstances = ImmutableMap.of(
+ "0_0", replicaGroup0,
+ "0_1", replicaGroup1);
+ InstancePartitions instancePartitions = new
InstancePartitions(offlineTableName);
+ instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
+ instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
+ BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ BrokerRequest brokerRequest = mock(BrokerRequest.class);
+ PinotQuery pinotQuery = mock(PinotQuery.class);
+ Map<String, String> queryOptions = new HashMap<>();
+
+ when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
+ when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
(ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+
+ MultiStageReplicaGroupSelector multiStageSelector =
+ new MultiStageReplicaGroupSelector(offlineTableName, propertyStore,
brokerMetrics, null);
+ multiStageSelector = spy(multiStageSelector);
+
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
+
+ List<String> enabledInstances = new ArrayList<>();
+ IdealState idealState = new IdealState(offlineTableName);
+ Map<String, Map<String, String>> idealStateSegmentAssignment =
idealState.getRecord().getMapFields();
+ ExternalView externalView = new ExternalView(offlineTableName);
+ Map<String, Map<String, String>> externalViewSegmentAssignment =
externalView.getRecord().getMapFields();
+ Set<String> onlineSegments = new HashSet<>();
+
+ // Mark all instances as enabled
+ for (int i = 0; i < 4; i++) {
+ enabledInstances.add(String.format("instance-%d", i));
+ }
+
+ List<String> segments = getSegments();
+
+ // Create two idealState and externalView maps. One is used for segments
with replica-group=0 and the other for rg=1
+ Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+ Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+ Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
+ Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+
+ // instance-0 and instance-2 mirror each other in the two replica-groups.
Same for instance-1 and instance-3.
+ for (int i = 0; i < 4; i++) {
+ String instance = enabledInstances.get(i);
+ if (i % 2 == 0) {
+ idealStateInstanceStateMap0.put(instance, ONLINE);
+ externalViewInstanceStateMap0.put(instance, ONLINE);
+ } else {
+ idealStateInstanceStateMap1.put(instance, ONLINE);
+ externalViewInstanceStateMap1.put(instance, ONLINE);
+ }
+ }
+
+ // Even numbered segments get assigned to [instance-0, instance-2], and
odd numbered segments get assigned to
+ // [instance-1,instance-3].
+ for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+ String segment = segments.get(segmentNum);
+ if (segmentNum % 2 == 0) {
+ idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0);
+ externalViewSegmentAssignment.put(segment,
externalViewInstanceStateMap0);
+ } else {
+ idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1);
+ externalViewSegmentAssignment.put(segment,
externalViewInstanceStateMap1);
+ }
+ onlineSegments.add(segment);
+ }
+
+ multiStageSelector.init(new HashSet<>(enabledInstances), idealState,
externalView, onlineSegments);
+
+ // Using requestId=0 should select replica-group 0. Even segments get
assigned to instance-0 and odd segments get
+ // assigned to instance-1.
+ Map<String, String> expectedReplicaGroupInstanceSelectorResult = new
HashMap<>();
+ for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+ expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum),
replicaGroup0.get(segmentNum % 2));
+ }
+ InstanceSelector.SelectionResult selectionResult =
multiStageSelector.select(brokerRequest, segments, 0);
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
+
+ // Using same requestId again should return the same selection
+ selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
+
+ // Using requestId=1 should select replica-group 1
+ expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+ for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+ expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum),
replicaGroup1.get(segmentNum % 2));
+ }
+ selectionResult = multiStageSelector.select(brokerRequest, segments, 1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
+
+ // If instance-0 is down, replica-group 1 should be picked even with
requestId=0
+ enabledInstances.remove("instance-0");
+ multiStageSelector.init(new HashSet<>(enabledInstances), idealState,
externalView, onlineSegments);
+ selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(),
expectedReplicaGroupInstanceSelectorResult);
+
+ // If instance-2 also goes down, no replica-group is eligible
+ enabledInstances.remove("instance-2");
+ multiStageSelector.init(new HashSet<>(enabledInstances), idealState,
externalView, onlineSegments);
+ try {
+ multiStageSelector.select(brokerRequest, segments, 0);
+ fail("Method call above should have failed");
+ } catch (Exception ignored) {
+ }
}
@Test
@@ -880,10 +1015,10 @@ public class InstanceSelectorTest {
PinotQuery pinotQuery = mock(PinotQuery.class);
when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
when(pinotQuery.getQueryOptions()).thenReturn(null);
- InstanceSelector.SelectionResult selectionResult =
balancedInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult =
balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
@@ -904,10 +1039,10 @@ public class InstanceSelectorTest {
enabledInstances.add(errorInstance);
balancedInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(errorInstance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(errorInstance));
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
@@ -925,10 +1060,10 @@ public class InstanceSelectorTest {
enabledInstances.add(instance);
balancedInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(instance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(instance));
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -936,10 +1071,10 @@ public class InstanceSelectorTest {
idealStateInstanceStateMap.put(instance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -958,10 +1093,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(instance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -969,10 +1104,10 @@ public class InstanceSelectorTest {
idealStateInstanceStateMap.remove(instance);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
@@ -993,10 +1128,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(errorInstance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
@@ -1017,10 +1152,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(errorInstance, ERROR);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -1038,10 +1173,10 @@ public class InstanceSelectorTest {
enabledInstances.remove(instance);
balancedInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(instance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(instance));
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
@@ -1061,10 +1196,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap0.put(errorInstance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
assertEquals(selectionResult.getUnavailableSegments(),
Collections.singletonList(segment1));
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
assertEquals(selectionResult.getUnavailableSegments(),
Collections.singletonList(segment1));
@@ -1082,10 +1217,10 @@ public class InstanceSelectorTest {
enabledInstances.remove(errorInstance);
balancedInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(errorInstance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances,
Collections.singletonList(errorInstance));
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
@@ -1106,10 +1241,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(instance, CONSUMING);
balancedInstanceSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState,
externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest,
segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest,
segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
- selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult =
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(),
Arrays.asList(segment0, segment1));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index db535dcaa6..857f0207da 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -50,7 +50,7 @@ public interface RoutingManager {
* @param brokerRequest the broker request constructed from a query.
* @return the route table.
*/
- RoutingTable getRoutingTable(BrokerRequest brokerRequest);
+ RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);
/**
* Validate routing exist for a table
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 814837ec15..d84a70d3f8 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -125,11 +125,11 @@ public class QueryEnvironment {
* @param sqlNodeAndOptions parsed SQL query.
* @return a dispatchable query plan
*/
- public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions
sqlNodeAndOptions) {
+ public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions
sqlNodeAndOptions, long requestId) {
try (PlannerContext plannerContext = new PlannerContext(_config,
_catalogReader, _typeFactory, _hepProgram)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(),
plannerContext);
- return toDispatchablePlan(relRoot, plannerContext);
+ return toDispatchablePlan(relRoot, plannerContext, requestId);
} catch (CalciteContextException e) {
throw new RuntimeException("Error composing query plan for '" + sqlQuery
+ "': " + e.getMessage() + "'", e);
@@ -141,9 +141,9 @@ public class QueryEnvironment {
/**
* Explain a SQL query.
*
- * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions)},
this API runs the query compilation.
- * But it doesn't run the distributed {@link QueryPlan} generation, instead
it only returns the explained logical
- * plan.
+ * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions,
long)}, this API runs the query
+ * compilation. But it doesn't run the distributed {@link QueryPlan}
generation, instead it only returns the
+ * explained logical plan.
*
* @param sqlQuery SQL query string.
* @param sqlNodeAndOptions parsed SQL query.
@@ -165,7 +165,7 @@ public class QueryEnvironment {
@VisibleForTesting
public QueryPlan planQuery(String sqlQuery) {
- return planQuery(sqlQuery,
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery));
+ return planQuery(sqlQuery,
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), 0);
}
@VisibleForTesting
@@ -219,9 +219,9 @@ public class QueryEnvironment {
}
}
- private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext
plannerContext) {
+ private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext
plannerContext, long requestId) {
// 5. construct a dispatchable query plan.
- StagePlanner queryStagePlanner = new StagePlanner(plannerContext,
_workerManager);
+ StagePlanner queryStagePlanner = new StagePlanner(plannerContext,
_workerManager, requestId);
return queryStagePlanner.makePlan(relRoot);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 2d61856c85..5f46b23d26 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -44,10 +44,12 @@ public class StagePlanner {
private final PlannerContext _plannerContext; // DO NOT REMOVE.
private final WorkerManager _workerManager;
private int _stageIdCounter;
+ private long _requestId;
- public StagePlanner(PlannerContext plannerContext, WorkerManager
workerManager) {
+ public StagePlanner(PlannerContext plannerContext, WorkerManager
workerManager, long requestId) {
_plannerContext = plannerContext;
_workerManager = workerManager;
+ _requestId = requestId;
}
/**
@@ -79,7 +81,7 @@ public class StagePlanner {
// assign workers to each stage.
for (Map.Entry<Integer, StageMetadata> e :
queryPlan.getStageMetadataMap().entrySet()) {
- _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+ _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId);
}
return queryPlan;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 112aec606b..42bb19d269 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -58,12 +58,12 @@ public class WorkerManager {
_routingManager = routingManager;
}
- public void assignWorkerToStage(int stageId, StageMetadata stageMetadata) {
+ public void assignWorkerToStage(int stageId, StageMetadata stageMetadata,
long requestId) {
List<String> scannedTables = stageMetadata.getScannedTables();
if (scannedTables.size() == 1) {
// table scan stage, need to attach server as well as segment info for
each physical table type.
String logicalTableName = scannedTables.get(0);
- Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName);
+ Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName, requestId);
if (routingTableMap.size() == 0) {
throw new IllegalArgumentException("Unable to find routing entries for
table: " + logicalTableName);
}
@@ -124,22 +124,22 @@ public class WorkerManager {
* @param logicalTableName it can either be a hybrid table name or a
physical table name with table type.
* @return keyed-map from table type(s) to routing table(s).
*/
- private Map<String, RoutingTable> getRoutingTable(String logicalTableName) {
+ private Map<String, RoutingTable> getRoutingTable(String logicalTableName,
long requestId) {
String rawTableName =
TableNameBuilder.extractRawTableName(logicalTableName);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(logicalTableName);
Map<String, RoutingTable> routingTableMap = new HashMap<>();
RoutingTable routingTable;
if (tableType == null) {
- routingTable = getRoutingTable(rawTableName, TableType.OFFLINE);
+ routingTable = getRoutingTable(rawTableName, TableType.OFFLINE,
requestId);
if (routingTable != null) {
routingTableMap.put(TableType.OFFLINE.name(), routingTable);
}
- routingTable = getRoutingTable(rawTableName, TableType.REALTIME);
+ routingTable = getRoutingTable(rawTableName, TableType.REALTIME,
requestId);
if (routingTable != null) {
routingTableMap.put(TableType.REALTIME.name(), routingTable);
}
} else {
- routingTable = getRoutingTable(logicalTableName, tableType);
+ routingTable = getRoutingTable(logicalTableName, tableType, requestId);
if (routingTable != null) {
routingTableMap.put(tableType.name(), routingTable);
}
@@ -147,10 +147,10 @@ public class WorkerManager {
return routingTableMap;
}
- private RoutingTable getRoutingTable(String tableName, TableType tableType) {
+ private RoutingTable getRoutingTable(String tableName, TableType tableType,
long requestId) {
String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(
TableNameBuilder.extractRawTableName(tableName));
return _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
tableNameWithType));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
tableNameWithType), requestId);
}
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 9d2d5e1a22..5e75de0455 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -144,7 +144,7 @@ public class MockRoutingManagerFactory {
}
@Override
- public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long
requestId) {
String tableName =
brokerRequest.getPinotQuery().getDataSource().getTableName();
return _routingTableMap.getOrDefault(tableName,
_routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 2c238aa287..8af5773675 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -31,6 +31,7 @@ public class RoutingConfig extends BaseJsonConfig {
public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE =
"replicaGroup";
public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE =
"strictReplicaGroup";
+ public static final String MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE =
"multiStageReplicaGroup";
// Replaced by _segmentPrunerTypes and _instanceSelectorType
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]