This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 18adba0c91 [multistage] Make Intermediate Stage Worker Assignment
Tenant Aware (#10617)
18adba0c91 is described below
commit 18adba0c91f7b28dec829b5882420b6d7ff7a833
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Wed Apr 19 10:10:39 2023 -0700
[multistage] Make Intermediate Stage Worker Assignment Tenant Aware (#10617)
* Make Intermediate Stage Worker Assignment Tenant Aware
* Address review comments
---
.../MultiStageBrokerRequestHandler.java | 13 +---
.../pinot/broker/routing/BrokerRoutingManager.java | 78 +++++++++++++++++++++-
.../api/resources/PinotQueryResource.java | 19 ++----
.../apache/pinot/core/routing/RoutingManager.java | 10 ++-
.../org/apache/pinot/query/QueryEnvironment.java | 39 ++++++++---
.../pinot/query/planner/logical/StagePlanner.java | 5 +-
.../planner/physical/DispatchablePlanContext.java | 10 ++-
.../planner/physical/DispatchablePlanVisitor.java | 2 +-
.../apache/pinot/query/routing/WorkerManager.java | 60 +++++++++++++++--
.../query/testutils/MockRoutingManagerFactory.java | 5 ++
10 files changed, 195 insertions(+), 46 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 9e74dcf400..249772d56e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -21,15 +21,12 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -163,8 +160,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
case EXPLAIN:
queryPlanResult = _queryEnvironment.explainQuery(query,
sqlNodeAndOptions);
String plan = queryPlanResult.getExplainPlan();
- RelNode explainRelRoot = queryPlanResult.getRelRoot();
- if (!hasTableAccess(requesterIdentity,
getTableNamesFromRelRoot(explainRelRoot), requestId, requestContext)) {
+ Set<String> tableNames = queryPlanResult.getTableNames();
+ if (!hasTableAccess(requesterIdentity, tableNames, requestId,
requestContext)) {
return new
BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
}
@@ -183,7 +180,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
QueryPlan queryPlan = queryPlanResult.getQueryPlan();
- Set<String> tableNames =
getTableNamesFromRelRoot(queryPlanResult.getRelRoot());
+ Set<String> tableNames = queryPlanResult.getTableNames();
// Compilation Time. This includes the time taken for parsing, compiling,
create stage plans and assigning workers.
long compilationEndTimeNs = System.nanoTime();
@@ -288,10 +285,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return false;
}
- private Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
- return new HashSet<>(RelOptUtil.findAllTableQualifiedNames(relRoot));
- }
-
private void updatePhaseTimingForTables(Set<String> tableNames,
BrokerQueryPhase phase, long time) {
for (String tableName : tableNames) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 93edc45f52..d62dc639fb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -57,6 +57,8 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -102,11 +104,16 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
private final ServerRoutingStatsManager _serverRoutingStatsManager;
private final PinotConfiguration _pinotConfig;
+ // Map that contains the tableNameWithType as key and the enabled
serverInstances that are tagged with the table's
+ // tenant.
+ private final Map<String, Map<String, ServerInstance>>
_tableTenantServersMap = new ConcurrentHashMap<>();
+
private BaseDataAccessor<ZNRecord> _zkDataAccessor;
private String _externalViewPathPrefix;
private String _idealStatePathPrefix;
private String _instanceConfigsPath;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private HelixManager _helixManager;
private Set<String> _routableServers;
@@ -125,6 +132,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
_idealStatePathPrefix =
helixDataAccessor.keyBuilder().idealStates().getPath() + "/";
_instanceConfigsPath =
helixDataAccessor.keyBuilder().instanceConfigs().getPath();
_propertyStore = helixManager.getHelixPropertyStore();
+ _helixManager = helixManager;
}
@Override
@@ -244,7 +252,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
enabledServers.add(instanceId);
// Always refresh the server instance with the latest instance config
in case it changes
- ServerInstance serverInstance = new ServerInstance(new
InstanceConfig(instanceConfigZNRecord));
+ InstanceConfig instanceConfig = new
InstanceConfig(instanceConfigZNRecord);
+ ServerInstance serverInstance = new ServerInstance(instanceConfig);
if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null)
{
newEnabledServers.add(instanceId);
@@ -252,6 +261,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
if (_excludedServers.remove(instanceId)) {
LOGGER.info("Got excluded server: {} re-enabled, including it into
the routing", instanceId);
}
+
+ addNewServerToTableTenantServerMap(instanceId, serverInstance,
instanceConfig);
}
}
}
@@ -259,6 +270,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
for (String instance : _enabledServerInstanceMap.keySet()) {
if (!enabledServers.contains(instance)) {
newDisabledServers.add(instance);
+ deleteServerFromTableTenantServerMap(instance);
}
}
@@ -408,6 +420,9 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", tableNameWithType);
+ // Build a mapping from the table to the list of servers assigned to the
table's tenant.
+ buildTableTenantServerMap(tableNameWithType, tableConfig);
+
String idealStatePath = getIdealStatePath(tableNameWithType);
IdealState idealState = getIdealState(idealStatePath);
Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
@@ -545,6 +560,10 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
} else {
LOGGER.warn("Routing does not exist for table: {}, skipping removing
routing", tableNameWithType);
}
+
+ if (_tableTenantServersMap.remove(tableNameWithType) != null) {
+ LOGGER.info("Removed tenant servers for table: {}", tableNameWithType);
+ }
}
/**
@@ -603,6 +622,12 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
return _enabledServerInstanceMap;
}
+ @Override
+ public Map<String, ServerInstance> getEnabledServersForTableTenant(String
tableNameWithType) {
+ return _tableTenantServersMap.containsKey(tableNameWithType) ?
_tableTenantServersMap.get(tableNameWithType)
+ : new HashMap<String, ServerInstance>();
+ }
+
private String getIdealStatePath(String tableNameWithType) {
return _idealStatePathPrefix + tableNameWithType;
}
@@ -744,4 +769,55 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
}
}
+
+ private void buildTableTenantServerMap(String tableNameWithType, TableConfig
tableConfig) {
+ String serverTag = getServerTagForTable(tableNameWithType, tableConfig);
+ List<InstanceConfig> allInstanceConfigs =
HelixHelper.getInstanceConfigs(_helixManager);
+ List<InstanceConfig> instanceConfigsWithTag =
HelixHelper.getInstancesConfigsWithTag(allInstanceConfigs, serverTag);
+ Map<String, ServerInstance> serverInstances = new HashMap<>();
+ for (InstanceConfig serverInstanceConfig : instanceConfigsWithTag) {
+ serverInstances.put(serverInstanceConfig.getInstanceName(), new
ServerInstance(serverInstanceConfig));
+ }
+ _tableTenantServersMap.put(tableNameWithType, serverInstances);
+ LOGGER.info("Built map for table={} with {} server instances.",
tableNameWithType, serverInstances.size());
+ }
+
+ private void addNewServerToTableTenantServerMap(String instanceId,
ServerInstance serverInstance,
+ InstanceConfig instanceConfig) {
+ List<String> tags = instanceConfig.getTags();
+
+ for (Map.Entry<String, Map<String, ServerInstance>> entry :
_tableTenantServersMap.entrySet()) {
+ String tableNameWithType = entry.getKey();
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ String tableServerTag = getServerTagForTable(tableNameWithType,
tableConfig);
+
+ Map<String, ServerInstance> tenantServerMap = entry.getValue();
+
+ if (!tenantServerMap.containsKey(instanceId) &&
tags.contains(tableServerTag)) {
+ tenantServerMap.put(instanceId, serverInstance);
+ }
+ }
+ }
+
+ private String getServerTagForTable(String tableNameWithType, TableConfig
tableConfig) {
+ String serverTenantName = tableConfig.getTenantConfig().getServer();
+ String serverTag;
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ serverTag = TagNameUtils.getOfflineTagForTenant(serverTenantName);
+ } else {
+ // Realtime table
+ serverTag = TagNameUtils.getRealtimeTagForTenant(serverTenantName);
+ }
+
+ return serverTag;
+ }
+
+
+ private void deleteServerFromTableTenantServerMap(String server) {
+ for (Map.Entry<String, Map<String, ServerInstance>> entry :
_tableTenantServersMap.entrySet()) {
+ if (entry.getValue().remove(server) != null) {
+ LOGGER.info("Removing entry for server={}, table={}", server,
entry.getKey());
+ }
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 775c7ff1d2..c47d16151c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -185,9 +185,12 @@ public class PinotQueryResource {
"Unable to find table in cluster")).toString();
}
+ // When routing a query, there should be at least one common broker tenant
for the table. However, the server
+ // tenants can be completely disjoint. The leaf stages which access
segments will be processed on the respective
+ // server tenants for each table. The intermediate stages can be processed
in either or all of the server tenants
+ // belonging to the tables.
String brokerTenant = getCommonBrokerTenant(tableConfigList);
- String serverTenant = getCommonServerTenant(tableConfigList);
- if (brokerTenant == null || serverTenant == null) {
+ if (brokerTenant == null) {
return
QueryException.getException(QueryException.BROKER_REQUEST_SEND_ERROR,
new Exception(String.format("Unable to dispatch multistage query
with multiple tables : %s "
+ "on different tenant", tableNames))).toString();
@@ -277,18 +280,6 @@ public class PinotQueryResource {
return (String) (tableBrokers.toArray()[0]);
}
- // return the serverTenant if all table configs point to the same server,
else returns null
- private String getCommonServerTenant(List<TableConfig> tableConfigList) {
- Set<String> tableServers = new HashSet<>();
- for (TableConfig tableConfig : tableConfigList) {
- tableServers.add(tableConfig.getTenantConfig().getServer());
- }
- if (tableServers.size() != 1) {
- return null;
- }
- return (String) (tableServers.toArray()[0]);
- }
-
private String sendRequestToBroker(String query, String instanceId, String
traceEnabled, String queryOptions,
HttpHeaders httpHeaders) {
InstanceConfig instanceConfig =
_pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index 857f0207da..2d6ad0a8ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -38,7 +38,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
public interface RoutingManager {
/**
- * Get all enabled server instances that are available for routing.
+ * Get all enabled server instances in the cluster.
*
* @return all currently enabled server instances.
*/
@@ -67,4 +67,12 @@ public interface RoutingManager {
* @return time boundary info.
*/
TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName);
+
+ /**
+ * Returns all enabled server instances for a given table's server tenant.
+ *
+ * @param tableNameWithType name of the table with type
+ * @return all enabled servers for a table's server tenant
+ */
+ Map<String, ServerInstance> getEnabledServersForTableTenant(String
tableNameWithType);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index ff9b9150b1..be9406e475 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -20,7 +20,10 @@ package org.apache.pinot.query;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
import java.util.Properties;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
@@ -149,7 +152,9 @@ public class QueryEnvironment {
try (PlannerContext plannerContext = new PlannerContext(_config,
_catalogReader, _typeFactory, _hepProgram)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(),
plannerContext);
- return new QueryPlannerResult(toDispatchablePlan(relRoot,
plannerContext, requestId), null, relRoot.rel);
+ Set<String> tableNames = getTableNamesFromRelRoot(relRoot.rel);
+ return new QueryPlannerResult(toDispatchablePlan(relRoot,
plannerContext, requestId, tableNames), null,
+ tableNames);
} catch (CalciteContextException e) {
throw new RuntimeException("Error composing query plan for '" + sqlQuery
+ "': " + e.getMessage() + "'", e);
@@ -177,7 +182,8 @@ public class QueryEnvironment {
SqlExplainFormat format = explain.getFormat() == null ?
SqlExplainFormat.DOT : explain.getFormat();
SqlExplainLevel level =
explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES
: explain.getDetailLevel();
- return new QueryPlannerResult(null,
PlannerUtils.explainPlan(relRoot.rel, format, level), relRoot.rel);
+ Set<String> tableNames = getTableNamesFromRelRoot(relRoot.rel);
+ return new QueryPlannerResult(null,
PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames);
} catch (Exception e) {
throw new RuntimeException("Error explain query plan for: " + sqlQuery,
e);
}
@@ -199,12 +205,12 @@ public class QueryEnvironment {
public static class QueryPlannerResult {
private QueryPlan _queryPlan;
private String _explainPlan;
- private RelNode _relRoot;
+ Set<String> _tableNames;
- QueryPlannerResult(@Nullable QueryPlan queryPlan, @Nullable String
explainPlan, RelNode relRoot) {
+ QueryPlannerResult(@Nullable QueryPlan queryPlan, @Nullable String
explainPlan, Set<String> tableNames) {
_queryPlan = queryPlan;
_explainPlan = explainPlan;
- _relRoot = relRoot;
+ _tableNames = tableNames;
}
public String getExplainPlan() {
@@ -215,8 +221,9 @@ public class QueryEnvironment {
return _queryPlan;
}
- public RelNode getRelRoot() {
- return _relRoot;
+ // Returns all the table names in the query.
+ public Set<String> getTableNames() {
+ return _tableNames;
}
}
@@ -267,10 +274,11 @@ public class QueryEnvironment {
}
}
- private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext
plannerContext, long requestId) {
+ private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext
plannerContext, long requestId,
+ Set<String> tableNames) {
// 5. construct a dispatchable query plan.
StagePlanner queryStagePlanner = new StagePlanner(plannerContext,
_workerManager, requestId, _tableCache);
- return queryStagePlanner.makePlan(relRoot);
+ return queryStagePlanner.makePlan(relRoot, tableNames);
}
// --------------------------------------------------------------------------
@@ -280,4 +288,17 @@ public class QueryEnvironment {
private HintStrategyTable getHintStrategyTable() {
return PinotHintStrategyTable.PINOT_HINT_STRATEGY_TABLE;
}
+
+
+ private Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
+ Set<String> tableNames = new HashSet<>();
+ List<String> qualifiedTableNames =
RelOptUtil.findAllTableQualifiedNames(relRoot);
+ for (String qualifiedTableName : qualifiedTableNames) {
+ // Calcite encloses table and schema names in square brackets to
properly quote and delimit them in SQL
+ // statements, particularly to handle cases when they contain special
characters or reserved keywords.
+ String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1");
+ tableNames.add(tableName);
+ }
+ return tableNames;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 89b66d8f5d..7b98d63893 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.planner.logical;
import java.util.List;
+import java.util.Set;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
@@ -66,7 +67,7 @@ public class StagePlanner {
* @param relRoot relational plan root.
* @return dispatchable plan.
*/
- public QueryPlan makePlan(RelRoot relRoot) {
+ public QueryPlan makePlan(RelRoot relRoot, Set<String> tableNames) {
RelNode relRootNode = relRoot.rel;
// Stage ID starts with 1, 0 will be reserved for ROOT stage.
_stageIdCounter = 1;
@@ -86,7 +87,7 @@ public class StagePlanner {
// perform physical plan conversion and assign workers to each stage.
DispatchablePlanContext physicalPlanContext = new DispatchablePlanContext(
- _workerManager, _requestId, _plannerContext, relRoot.fields
+ _workerManager, _requestId, _plannerContext, relRoot.fields, tableNames
);
DispatchablePlanVisitor.INSTANCE.constructDispatchablePlan(globalReceiverNode,
physicalPlanContext);
QueryPlan queryPlan = physicalPlanContext.getQueryPlan();
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 8e7bca3d1f..7f333775b7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.physical;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
@@ -31,13 +32,15 @@ public class DispatchablePlanContext {
private final long _requestId;
private final PlannerContext _plannerContext;
private final QueryPlan _queryPlan;
+ private final Set<String> _tableNames;
public DispatchablePlanContext(WorkerManager workerManager, long requestId,
PlannerContext plannerContext,
- List<Pair<Integer, String>> resultFields) {
+ List<Pair<Integer, String>> resultFields, Set<String> tableNames) {
_workerManager = workerManager;
_requestId = requestId;
_plannerContext = plannerContext;
_queryPlan = new QueryPlan(resultFields, new HashMap<>(), new HashMap<>());
+ _tableNames = tableNames;
}
public QueryPlan getQueryPlan() {
@@ -55,4 +58,9 @@ public class DispatchablePlanContext {
public PlannerContext getPlannerContext() {
return _plannerContext;
}
+
+ // Returns all the table names.
+ public Set<String> getTableNames() {
+ return _tableNames;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index f92415314d..d7b1343141 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -59,7 +59,7 @@ public class DispatchablePlanVisitor implements
StageNodeVisitor<Void, Dispatcha
private void computeWorkerAssignment(StageNode node, DispatchablePlanContext
context) {
int stageId = node.getStageId();
context.getWorkerManager().assignWorkerToStage(stageId,
context.getQueryPlan().getStageMetadataMap().get(stageId),
- context.getRequestId(), context.getPlannerContext().getOptions());
+ context.getRequestId(), context.getPlannerContext().getOptions(),
context.getTableNames());
}
@Override
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index c1f896c86d..1e12c961dd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -21,11 +21,12 @@ package org.apache.pinot.query.routing;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
@@ -62,11 +63,11 @@ public class WorkerManager {
}
public void assignWorkerToStage(int stageId, StageMetadata stageMetadata,
long requestId,
- Map<String, String> options) {
- List<String> scannedTables = stageMetadata.getScannedTables();
- if (scannedTables.size() == 1) {
+ Map<String, String> options, Set<String> tableNames) {
+ if (isLeafStage(stageMetadata)) {
// --- LEAF STAGE ---
// table scan stage, need to attach server as well as segment info for
each physical table type.
+ List<String> scannedTables = stageMetadata.getScannedTables();
String logicalTableName = scannedTables.get(0);
Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName, requestId);
if (routingTableMap.size() == 0) {
@@ -112,13 +113,26 @@ public class WorkerManager {
new VirtualServer(new WorkerInstance(_hostName, _port, _port, _port,
_port), 0)));
} else {
// --- INTERMEDIATE STAGES ---
+ // If the query has more than one table, it is possible that the tables
could be hosted on different tenants.
+ // The intermediate stage will be processed on servers randomly picked
from the tenants belonging to either or
+ // all of the tables in the query.
// TODO: actually make assignment strategy decisions for intermediate
stages
-
stageMetadata.setServerInstances(assignServers(_routingManager.getEnabledServerInstanceMap().values(),
- stageMetadata.isRequiresSingletonInstance(), options));
+ Set<ServerInstance> serverInstances = new HashSet<>();
+ if (tableNames.size() == 0) {
+ // This could be the case from queries that don't actually fetch
values from the tables. In such cases the
+ // routing need not be tenant aware.
+ // Eg: SELECT 1 AS one FROM select_having_expression_test_test_having
HAVING 1 > 2;
+ serverInstances =
_routingManager.getEnabledServerInstanceMap().values().stream().collect(Collectors.toSet());
+ } else {
+ serverInstances = fetchServersForIntermediateStage(tableNames);
+ }
+
+ stageMetadata.setServerInstances(
+ assignServers(serverInstances,
stageMetadata.isRequiresSingletonInstance(), options));
}
}
- private static List<VirtualServer> assignServers(Collection<ServerInstance>
servers,
+ private static List<VirtualServer> assignServers(Set<ServerInstance> servers,
boolean requiresSingletonInstance, Map<String, String> options) {
int stageParallelism = Integer.parseInt(
options.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.STAGE_PARALLELISM,
"1"));
@@ -183,4 +197,36 @@ public class WorkerManager {
return _routingManager.getRoutingTable(
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
tableNameWithType), requestId);
}
+
+ // TODO: Find a better way to determine whether a stage is leaf stage or
intermediary. We could have query plans that
+ // process table data even in intermediary stages.
+ private boolean isLeafStage(StageMetadata stageMetadata) {
+ return stageMetadata.getScannedTables().size() == 1;
+ }
+
+ private Set<ServerInstance> fetchServersForIntermediateStage(Set<String>
tableNames) {
+ Set<ServerInstance> serverInstances = new HashSet<>();
+
+ for (String table : tableNames) {
+ String rawTableName = TableNameBuilder.extractRawTableName(table);
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(table);
+ if (tableType == null) {
+ String offlineTable =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+ String realtimeTable =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
+
+ // Servers in the offline table's tenant.
+ Map<String, ServerInstance> servers =
_routingManager.getEnabledServersForTableTenant(offlineTable);
+ serverInstances.addAll(servers.values());
+
+ // Servers in the online table's tenant.
+ servers =
_routingManager.getEnabledServersForTableTenant(realtimeTable);
+ serverInstances.addAll(servers.values());
+ } else {
+ Map<String, ServerInstance> servers =
_routingManager.getEnabledServersForTableTenant(table);
+ serverInstances.addAll(servers.values());
+ }
+ }
+
+ return serverInstances;
+ }
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 5e75de0455..46dd4aff8f 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -161,5 +161,10 @@ public class MockRoutingManagerFactory {
return _hybridTables.contains(rawTableName) ? new
TimeBoundaryInfo(TIME_BOUNDARY_COLUMN,
String.valueOf(System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(1))) : null;
}
+
+ @Override
+ public Map<String, ServerInstance> getEnabledServersForTableTenant(String
tableNameWithType) {
+ return _serverInstances;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]