This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 18adba0c91 [multistage] Make Intermediate Stage Worker Assignment 
Tenant Aware (#10617)
18adba0c91 is described below

commit 18adba0c91f7b28dec829b5882420b6d7ff7a833
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Wed Apr 19 10:10:39 2023 -0700

    [multistage] Make Intermediate Stage Worker Assignment Tenant Aware (#10617)
    
    * Make Intermediate Stage Worker Assignment Tenant Aware
    
    * Address review comments
---
 .../MultiStageBrokerRequestHandler.java            | 13 +---
 .../pinot/broker/routing/BrokerRoutingManager.java | 78 +++++++++++++++++++++-
 .../api/resources/PinotQueryResource.java          | 19 ++----
 .../apache/pinot/core/routing/RoutingManager.java  | 10 ++-
 .../org/apache/pinot/query/QueryEnvironment.java   | 39 ++++++++---
 .../pinot/query/planner/logical/StagePlanner.java  |  5 +-
 .../planner/physical/DispatchablePlanContext.java  | 10 ++-
 .../planner/physical/DispatchablePlanVisitor.java  |  2 +-
 .../apache/pinot/query/routing/WorkerManager.java  | 60 +++++++++++++++--
 .../query/testutils/MockRoutingManagerFactory.java |  5 ++
 10 files changed, 195 insertions(+), 46 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 9e74dcf400..249772d56e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -21,15 +21,12 @@ package org.apache.pinot.broker.requesthandler;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -163,8 +160,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         case EXPLAIN:
           queryPlanResult = _queryEnvironment.explainQuery(query, 
sqlNodeAndOptions);
           String plan = queryPlanResult.getExplainPlan();
-          RelNode explainRelRoot = queryPlanResult.getRelRoot();
-          if (!hasTableAccess(requesterIdentity, 
getTableNamesFromRelRoot(explainRelRoot), requestId, requestContext)) {
+          Set<String> tableNames = queryPlanResult.getTableNames();
+          if (!hasTableAccess(requesterIdentity, tableNames, requestId, 
requestContext)) {
             return new 
BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
           }
 
@@ -183,7 +180,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     }
 
     QueryPlan queryPlan = queryPlanResult.getQueryPlan();
-    Set<String> tableNames = 
getTableNamesFromRelRoot(queryPlanResult.getRelRoot());
+    Set<String> tableNames = queryPlanResult.getTableNames();
 
     // Compilation Time. This includes the time taken for parsing, compiling, 
create stage plans and assigning workers.
     long compilationEndTimeNs = System.nanoTime();
@@ -288,10 +285,6 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     return false;
   }
 
-  private Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
-    return new HashSet<>(RelOptUtil.findAllTableQualifiedNames(relRoot));
-  }
-
   private void updatePhaseTimingForTables(Set<String> tableNames,
       BrokerQueryPhase phase, long time) {
     for (String tableName : tableNames) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 93edc45f52..d62dc639fb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -57,6 +57,8 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -102,11 +104,16 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   private final ServerRoutingStatsManager _serverRoutingStatsManager;
   private final PinotConfiguration _pinotConfig;
 
+  // Map that contains the tableNameWithType as key and the enabled 
serverInstances that are tagged with the table's
+  // tenant.
+  private final Map<String, Map<String, ServerInstance>> 
_tableTenantServersMap = new ConcurrentHashMap<>();
+
   private BaseDataAccessor<ZNRecord> _zkDataAccessor;
   private String _externalViewPathPrefix;
   private String _idealStatePathPrefix;
   private String _instanceConfigsPath;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private HelixManager _helixManager;
 
   private Set<String> _routableServers;
 
@@ -125,6 +132,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     _idealStatePathPrefix = 
helixDataAccessor.keyBuilder().idealStates().getPath() + "/";
     _instanceConfigsPath = 
helixDataAccessor.keyBuilder().instanceConfigs().getPath();
     _propertyStore = helixManager.getHelixPropertyStore();
+    _helixManager = helixManager;
   }
 
   @Override
@@ -244,7 +252,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
         enabledServers.add(instanceId);
 
         // Always refresh the server instance with the latest instance config 
in case it changes
-        ServerInstance serverInstance = new ServerInstance(new 
InstanceConfig(instanceConfigZNRecord));
+        InstanceConfig instanceConfig = new 
InstanceConfig(instanceConfigZNRecord);
+        ServerInstance serverInstance = new ServerInstance(instanceConfig);
         if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null) 
{
           newEnabledServers.add(instanceId);
 
@@ -252,6 +261,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
           if (_excludedServers.remove(instanceId)) {
             LOGGER.info("Got excluded server: {} re-enabled, including it into 
the routing", instanceId);
           }
+
+          addNewServerToTableTenantServerMap(instanceId, serverInstance, 
instanceConfig);
         }
       }
     }
@@ -259,6 +270,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     for (String instance : _enabledServerInstanceMap.keySet()) {
       if (!enabledServers.contains(instance)) {
         newDisabledServers.add(instance);
+        deleteServerFromTableTenantServerMap(instance);
       }
     }
 
@@ -408,6 +420,9 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", tableNameWithType);
 
+    // Build a mapping from the table to the list of servers assigned to the 
table's tenant.
+    buildTableTenantServerMap(tableNameWithType, tableConfig);
+
     String idealStatePath = getIdealStatePath(tableNameWithType);
     IdealState idealState = getIdealState(idealStatePath);
     Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
@@ -545,6 +560,10 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     } else {
       LOGGER.warn("Routing does not exist for table: {}, skipping removing 
routing", tableNameWithType);
     }
+
+    if (_tableTenantServersMap.remove(tableNameWithType) != null) {
+      LOGGER.info("Removed tenant servers for table: {}", tableNameWithType);
+    }
   }
 
   /**
@@ -603,6 +622,12 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     return _enabledServerInstanceMap;
   }
 
+  @Override
+  public Map<String, ServerInstance> getEnabledServersForTableTenant(String 
tableNameWithType) {
+    return _tableTenantServersMap.containsKey(tableNameWithType) ? 
_tableTenantServersMap.get(tableNameWithType)
+        : new HashMap<String, ServerInstance>();
+  }
+
   private String getIdealStatePath(String tableNameWithType) {
     return _idealStatePathPrefix + tableNameWithType;
   }
@@ -744,4 +769,55 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
       }
     }
   }
+
+  private void buildTableTenantServerMap(String tableNameWithType, TableConfig 
tableConfig) {
+    String serverTag = getServerTagForTable(tableNameWithType, tableConfig);
+    List<InstanceConfig> allInstanceConfigs = 
HelixHelper.getInstanceConfigs(_helixManager);
+    List<InstanceConfig> instanceConfigsWithTag = 
HelixHelper.getInstancesConfigsWithTag(allInstanceConfigs, serverTag);
+    Map<String, ServerInstance> serverInstances = new HashMap<>();
+    for (InstanceConfig serverInstanceConfig : instanceConfigsWithTag) {
+      serverInstances.put(serverInstanceConfig.getInstanceName(), new 
ServerInstance(serverInstanceConfig));
+    }
+    _tableTenantServersMap.put(tableNameWithType, serverInstances);
+    LOGGER.info("Built map for table={} with {} server instances.", 
tableNameWithType, serverInstances.size());
+  }
+
+  private void addNewServerToTableTenantServerMap(String instanceId, 
ServerInstance serverInstance,
+      InstanceConfig instanceConfig) {
+    List<String> tags = instanceConfig.getTags();
+
+    for (Map.Entry<String, Map<String, ServerInstance>> entry : 
_tableTenantServersMap.entrySet()) {
+      String tableNameWithType = entry.getKey();
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      String tableServerTag = getServerTagForTable(tableNameWithType, 
tableConfig);
+
+      Map<String, ServerInstance> tenantServerMap = entry.getValue();
+
+      if (!tenantServerMap.containsKey(instanceId) && 
tags.contains(tableServerTag)) {
+        tenantServerMap.put(instanceId, serverInstance);
+      }
+    }
+  }
+
+  private String getServerTagForTable(String tableNameWithType, TableConfig 
tableConfig) {
+    String serverTenantName = tableConfig.getTenantConfig().getServer();
+    String serverTag;
+    if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+      serverTag = TagNameUtils.getOfflineTagForTenant(serverTenantName);
+    } else {
+      // Realtime table
+      serverTag = TagNameUtils.getRealtimeTagForTenant(serverTenantName);
+    }
+
+    return serverTag;
+  }
+
+
+  private void deleteServerFromTableTenantServerMap(String server) {
+    for (Map.Entry<String, Map<String, ServerInstance>> entry : 
_tableTenantServersMap.entrySet()) {
+      if (entry.getValue().remove(server) != null) {
+        LOGGER.info("Removing entry for server={}, table={}", server, 
entry.getKey());
+      }
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 775c7ff1d2..c47d16151c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -185,9 +185,12 @@ public class PinotQueryResource {
           "Unable to find table in cluster")).toString();
     }
 
+    // When routing a query, there should be at least one common broker tenant 
for the table. However, the server
+    // tenants can be completely disjoint. The leaf stages which access 
segments will be processed on the respective
+    // server tenants for each table. The intermediate stages can be processed 
in either or all of the server tenants
+    // belonging to the tables.
     String brokerTenant = getCommonBrokerTenant(tableConfigList);
-    String serverTenant = getCommonServerTenant(tableConfigList);
-    if (brokerTenant == null || serverTenant == null) {
+    if (brokerTenant == null) {
       return 
QueryException.getException(QueryException.BROKER_REQUEST_SEND_ERROR,
           new Exception(String.format("Unable to dispatch multistage query 
with multiple tables : %s "
               + "on different tenant", tableNames))).toString();
@@ -277,18 +280,6 @@ public class PinotQueryResource {
     return (String) (tableBrokers.toArray()[0]);
   }
 
-  // return the serverTenant if all table configs point to the same server, 
else returns null
-  private String getCommonServerTenant(List<TableConfig> tableConfigList) {
-    Set<String> tableServers = new HashSet<>();
-    for (TableConfig tableConfig : tableConfigList) {
-      tableServers.add(tableConfig.getTenantConfig().getServer());
-    }
-    if (tableServers.size() != 1) {
-      return null;
-    }
-    return (String) (tableServers.toArray()[0]);
-  }
-
   private String sendRequestToBroker(String query, String instanceId, String 
traceEnabled, String queryOptions,
       HttpHeaders httpHeaders) {
     InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index 857f0207da..2d6ad0a8ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -38,7 +38,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
 public interface RoutingManager {
 
   /**
-   * Get all enabled server instances that are available for routing.
+   * Get all enabled server instances in the cluster.
    *
    * @return all currently enabled server instances.
    */
@@ -67,4 +67,12 @@ public interface RoutingManager {
    * @return time boundary info.
    */
   TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName);
+
+  /**
+   * Returns all enabled server instances for a given table's server tenant.
+   *
+   * @param tableNameWithType name of the table with type
+   * @return all enabled servers for a table's server tenant
+   */
+  Map<String, ServerInstance> getEnabledServersForTableTenant(String 
tableNameWithType);
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index ff9b9150b1..be9406e475 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -20,7 +20,10 @@ package org.apache.pinot.query;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
@@ -149,7 +152,9 @@ public class QueryEnvironment {
     try (PlannerContext plannerContext = new PlannerContext(_config, 
_catalogReader, _typeFactory, _hepProgram)) {
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), 
plannerContext);
-      return new QueryPlannerResult(toDispatchablePlan(relRoot, 
plannerContext, requestId), null, relRoot.rel);
+      Set<String> tableNames = getTableNamesFromRelRoot(relRoot.rel);
+      return new QueryPlannerResult(toDispatchablePlan(relRoot, 
plannerContext, requestId, tableNames), null,
+          tableNames);
     } catch (CalciteContextException e) {
       throw new RuntimeException("Error composing query plan for '" + sqlQuery
           + "': " + e.getMessage() + "'", e);
@@ -177,7 +182,8 @@ public class QueryEnvironment {
       SqlExplainFormat format = explain.getFormat() == null ? 
SqlExplainFormat.DOT : explain.getFormat();
       SqlExplainLevel level =
           explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES 
: explain.getDetailLevel();
-      return new QueryPlannerResult(null, 
PlannerUtils.explainPlan(relRoot.rel, format, level), relRoot.rel);
+      Set<String> tableNames = getTableNamesFromRelRoot(relRoot.rel);
+      return new QueryPlannerResult(null, 
PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames);
     } catch (Exception e) {
       throw new RuntimeException("Error explain query plan for: " + sqlQuery, 
e);
     }
@@ -199,12 +205,12 @@ public class QueryEnvironment {
   public static class QueryPlannerResult {
     private QueryPlan _queryPlan;
     private String _explainPlan;
-    private RelNode _relRoot;
+    Set<String> _tableNames;
 
-    QueryPlannerResult(@Nullable QueryPlan queryPlan, @Nullable String 
explainPlan, RelNode relRoot) {
+    QueryPlannerResult(@Nullable QueryPlan queryPlan, @Nullable String 
explainPlan, Set<String> tableNames) {
       _queryPlan = queryPlan;
       _explainPlan = explainPlan;
-      _relRoot = relRoot;
+      _tableNames = tableNames;
     }
 
     public String getExplainPlan() {
@@ -215,8 +221,9 @@ public class QueryEnvironment {
       return _queryPlan;
     }
 
-    public RelNode getRelRoot() {
-      return _relRoot;
+    // Returns all the table names in the query.
+    public Set<String> getTableNames() {
+      return _tableNames;
     }
   }
 
@@ -267,10 +274,11 @@ public class QueryEnvironment {
     }
   }
 
-  private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext 
plannerContext, long requestId) {
+  private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext 
plannerContext, long requestId,
+      Set<String> tableNames) {
     // 5. construct a dispatchable query plan.
     StagePlanner queryStagePlanner = new StagePlanner(plannerContext, 
_workerManager, requestId, _tableCache);
-    return queryStagePlanner.makePlan(relRoot);
+    return queryStagePlanner.makePlan(relRoot, tableNames);
   }
 
   // --------------------------------------------------------------------------
@@ -280,4 +288,17 @@ public class QueryEnvironment {
   private HintStrategyTable getHintStrategyTable() {
     return PinotHintStrategyTable.PINOT_HINT_STRATEGY_TABLE;
   }
+
+
+  private Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
+    Set<String> tableNames = new HashSet<>();
+    List<String> qualifiedTableNames = 
RelOptUtil.findAllTableQualifiedNames(relRoot);
+    for (String qualifiedTableName : qualifiedTableNames) {
+      // Calcite encloses table and schema names in square brackets to 
properly quote and delimit them in SQL
+      // statements, particularly to handle cases when they contain special 
characters or reserved keywords.
+      String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1");
+      tableNames.add(tableName);
+    }
+    return tableNames;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 89b66d8f5d..7b98d63893 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.planner.logical;
 
 import java.util.List;
+import java.util.Set;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
@@ -66,7 +67,7 @@ public class StagePlanner {
    * @param relRoot relational plan root.
    * @return dispatchable plan.
    */
-  public QueryPlan makePlan(RelRoot relRoot) {
+  public QueryPlan makePlan(RelRoot relRoot, Set<String> tableNames) {
     RelNode relRootNode = relRoot.rel;
     // Stage ID starts with 1, 0 will be reserved for ROOT stage.
     _stageIdCounter = 1;
@@ -86,7 +87,7 @@ public class StagePlanner {
 
     // perform physical plan conversion and assign workers to each stage.
     DispatchablePlanContext physicalPlanContext = new DispatchablePlanContext(
-        _workerManager, _requestId, _plannerContext, relRoot.fields
+        _workerManager, _requestId, _plannerContext, relRoot.fields, tableNames
     );
     
DispatchablePlanVisitor.INSTANCE.constructDispatchablePlan(globalReceiverNode, 
physicalPlanContext);
     QueryPlan queryPlan = physicalPlanContext.getQueryPlan();
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 8e7bca3d1f..7f333775b7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.physical;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import org.apache.calcite.util.Pair;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -31,13 +32,15 @@ public class DispatchablePlanContext {
   private final long _requestId;
   private final PlannerContext _plannerContext;
   private final QueryPlan _queryPlan;
+  private final Set<String> _tableNames;
 
   public DispatchablePlanContext(WorkerManager workerManager, long requestId, 
PlannerContext plannerContext,
-      List<Pair<Integer, String>> resultFields) {
+      List<Pair<Integer, String>> resultFields, Set<String> tableNames) {
     _workerManager = workerManager;
     _requestId = requestId;
     _plannerContext = plannerContext;
     _queryPlan = new QueryPlan(resultFields, new HashMap<>(), new HashMap<>());
+    _tableNames = tableNames;
   }
 
   public QueryPlan getQueryPlan() {
@@ -55,4 +58,9 @@ public class DispatchablePlanContext {
   public PlannerContext getPlannerContext() {
     return _plannerContext;
   }
+
+  // Returns all the table names.
+  public Set<String> getTableNames() {
+    return _tableNames;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index f92415314d..d7b1343141 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -59,7 +59,7 @@ public class DispatchablePlanVisitor implements 
StageNodeVisitor<Void, Dispatcha
   private void computeWorkerAssignment(StageNode node, DispatchablePlanContext 
context) {
     int stageId = node.getStageId();
     context.getWorkerManager().assignWorkerToStage(stageId, 
context.getQueryPlan().getStageMetadataMap().get(stageId),
-        context.getRequestId(), context.getPlannerContext().getOptions());
+        context.getRequestId(), context.getPlannerContext().getOptions(), 
context.getTableNames());
   }
 
   @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index c1f896c86d..1e12c961dd 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -21,11 +21,12 @@ package org.apache.pinot.query.routing;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
@@ -62,11 +63,11 @@ public class WorkerManager {
   }
 
   public void assignWorkerToStage(int stageId, StageMetadata stageMetadata, 
long requestId,
-      Map<String, String> options) {
-    List<String> scannedTables = stageMetadata.getScannedTables();
-    if (scannedTables.size() == 1) {
+      Map<String, String> options, Set<String> tableNames) {
+    if (isLeafStage(stageMetadata)) {
       // --- LEAF STAGE ---
       // table scan stage, need to attach server as well as segment info for 
each physical table type.
+      List<String> scannedTables = stageMetadata.getScannedTables();
       String logicalTableName = scannedTables.get(0);
       Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName, requestId);
       if (routingTableMap.size() == 0) {
@@ -112,13 +113,26 @@ public class WorkerManager {
           new VirtualServer(new WorkerInstance(_hostName, _port, _port, _port, 
_port), 0)));
     } else {
       // --- INTERMEDIATE STAGES ---
+      // If the query has more than one table, it is possible that the tables 
could be hosted on different tenants.
+      // The intermediate stage will be processed on servers randomly picked 
from the tenants belonging to either or
+      // all of the tables in the query.
       // TODO: actually make assignment strategy decisions for intermediate 
stages
-      
stageMetadata.setServerInstances(assignServers(_routingManager.getEnabledServerInstanceMap().values(),
-          stageMetadata.isRequiresSingletonInstance(), options));
+      Set<ServerInstance> serverInstances = new HashSet<>();
+      if (tableNames.size() == 0) {
+        // This could be the case from queries that don't actually fetch 
values from the tables. In such cases the
+        // routing need not be tenant aware.
+        // Eg: SELECT 1 AS one FROM select_having_expression_test_test_having 
HAVING 1 > 2;
+        serverInstances = 
_routingManager.getEnabledServerInstanceMap().values().stream().collect(Collectors.toSet());
+      } else {
+        serverInstances = fetchServersForIntermediateStage(tableNames);
+      }
+
+      stageMetadata.setServerInstances(
+          assignServers(serverInstances, 
stageMetadata.isRequiresSingletonInstance(), options));
     }
   }
 
-  private static List<VirtualServer> assignServers(Collection<ServerInstance> 
servers,
+  private static List<VirtualServer> assignServers(Set<ServerInstance> servers,
       boolean requiresSingletonInstance, Map<String, String> options) {
     int stageParallelism = Integer.parseInt(
         
options.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.STAGE_PARALLELISM,
 "1"));
@@ -183,4 +197,36 @@ public class WorkerManager {
     return _routingManager.getRoutingTable(
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType), requestId);
   }
+
+  // TODO: Find a better way to determine whether a stage is leaf stage or 
intermediary. We could have query plans that
+  //       process table data even in intermediary stages.
+  private boolean isLeafStage(StageMetadata stageMetadata) {
+    return stageMetadata.getScannedTables().size() == 1;
+  }
+
+  private Set<ServerInstance> fetchServersForIntermediateStage(Set<String> 
tableNames) {
+    Set<ServerInstance> serverInstances = new HashSet<>();
+
+    for (String table : tableNames) {
+      String rawTableName = TableNameBuilder.extractRawTableName(table);
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(table);
+      if (tableType == null) {
+        String offlineTable = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+        String realtimeTable = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
+
+        // Servers in the offline table's tenant.
+        Map<String, ServerInstance> servers = 
_routingManager.getEnabledServersForTableTenant(offlineTable);
+        serverInstances.addAll(servers.values());
+
+        // Servers in the online table's tenant.
+        servers = 
_routingManager.getEnabledServersForTableTenant(realtimeTable);
+        serverInstances.addAll(servers.values());
+      } else {
+        Map<String, ServerInstance> servers = 
_routingManager.getEnabledServersForTableTenant(table);
+        serverInstances.addAll(servers.values());
+      }
+    }
+
+    return serverInstances;
+  }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 5e75de0455..46dd4aff8f 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -161,5 +161,10 @@ public class MockRoutingManagerFactory {
       return _hybridTables.contains(rawTableName) ? new 
TimeBoundaryInfo(TIME_BOUNDARY_COLUMN,
           String.valueOf(System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1))) : null;
     }
+
+    @Override
+    public Map<String, ServerInstance> getEnabledServersForTableTenant(String 
tableNameWithType) {
+      return _serverInstances;
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to