Jackie-Jiang commented on code in PR #10886:
URL: https://github.com/apache/pinot/pull/10886#discussion_r1230256772


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -206,41 +179,318 @@ private Map<String, RoutingTable> getRoutingTable(String 
logicalTableName, long
   }
 
   private RoutingTable getRoutingTable(String tableName, TableType tableType, 
long requestId) {
-    String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(
-        TableNameBuilder.extractRawTableName(tableName));
+    String tableNameWithType =
+        
TableNameBuilder.forType(tableType).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
     return _routingManager.getRoutingTable(
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType), requestId);
   }
 
-  // TODO: Find a better way to determine whether a stage is leaf stage or 
intermediary. We could have query plans that
-  //       process table data even in intermediary stages.
-  private boolean isLeafStage(DispatchablePlanMetadata 
dispatchablePlanMetadata) {
-    return dispatchablePlanMetadata.getScannedTables().size() == 1;
+  private void assignWorkersToIntermediateFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
+    if (isColocatedJoin(fragment.getFragmentRoot())) {
+      try {
+        assignWorkersForColocatedJoin(fragment, context);
+        return;
+      } catch (Exception e) {
+        LOGGER.warn("[RequestId: {}] Caught exception while assigning workers 
for colocated join, "
+            + "falling back to regular worker assignment", 
context.getRequestId(), e);
+      }
+    }
+
+    // If the query has more than one table, it is possible that the tables 
could be hosted on different tenants.
+    // The intermediate stage will be processed on servers randomly picked 
from the tenants belonging to either or
+    // all of the tables in the query.
+    // TODO: actually make assignment strategy decisions for intermediate 
stages
+    List<ServerInstance> serverInstances;
+    Set<String> tableNames = context.getTableNames();
+    if (tableNames.size() == 0) {
+      // TODO: Short circuit it when no table needs to be scanned
+      // This could be the case from queries that don't actually fetch values 
from the tables. In such cases the
+      // routing need not be tenant aware.
+      // Eg: SELECT 1 AS one FROM select_having_expression_test_test_having 
HAVING 1 > 2;
+      serverInstances = new 
ArrayList<>(_routingManager.getEnabledServerInstanceMap().values());
+    } else {
+      serverInstances = fetchServersForIntermediateStage(tableNames);
+    }
+    DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
+    Map<String, String> options = context.getPlannerContext().getOptions();
+    int stageParallelism = 
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
+    if (metadata.isRequiresSingletonInstance()) {
+      // require singleton should return a single global worker ID with 0;
+      ServerInstance serverInstance = 
serverInstances.get(RANDOM.nextInt(serverInstances.size()));
+      metadata.setServerInstanceToWorkerIdMap(
+          Collections.singletonMap(new QueryServerInstance(serverInstance), 
Collections.singletonList(0)));
+      metadata.setTotalWorkerCount(1);
+    } else {
+      Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = 
new HashMap<>();
+      int nextWorkerId = 0;
+      for (ServerInstance serverInstance : serverInstances) {
+        List<Integer> workerIds = new ArrayList<>();
+        for (int i = 0; i < stageParallelism; i++) {
+          workerIds.add(nextWorkerId++);
+        }
+        serverInstanceToWorkerIdMap.put(new 
QueryServerInstance(serverInstance), workerIds);
+      }
+      metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+      metadata.setTotalWorkerCount(nextWorkerId);
+    }
+
+    for (PlanFragment child : fragment.getChildren()) {
+      assignWorkersToNonRootFragment(child, context);
+    }
   }
 
-  private Set<ServerInstance> fetchServersForIntermediateStage(Set<String> 
tableNames) {
-    Set<ServerInstance> serverInstances = new HashSet<>();
+  private boolean isColocatedJoin(PlanNode planNode) {
+    if (planNode instanceof JoinNode) {
+      return ((JoinNode) planNode).isColocatedJoin();
+    }
+    for (PlanNode child : planNode.getInputs()) {
+      if (isColocatedJoin(child)) {
+        return true;
+      }
+    }
+    return false;
+  }
 
-    for (String table : tableNames) {
-      String rawTableName = TableNameBuilder.extractRawTableName(table);
-      TableType tableType = TableNameBuilder.getTableTypeFromTableName(table);
-      if (tableType == null) {
-        String offlineTable = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
-        String realtimeTable = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
+  private void assignWorkersForColocatedJoin(PlanFragment fragment, 
DispatchablePlanContext context) {
+    List<PlanFragment> children = fragment.getChildren();
+    Preconditions.checkArgument(children.size() == 2, "Expecting 2 children, 
find: %s", children.size());
+    PlanFragment leftFragment = children.get(0);
+    PlanFragment rightFragment = children.get(1);
+    Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
+    DispatchablePlanMetadata leftMetadata = 
metadataMap.get(leftFragment.getFragmentId());
+    Preconditions.checkArgument(isLeafPlan(leftMetadata), "Left side is not 
leaf");
+    DispatchablePlanMetadata rightMetadata = 
metadataMap.get(rightFragment.getFragmentId());
+    Preconditions.checkArgument(isLeafPlan(rightMetadata), "Right side is not 
leaf");

Review Comment:
   Current implementation works only if we are directly accessing the table. In 
order to do multiple joins colocated, we need to find a way to gather all 
tables and pick the common servers from all of them. Added a TODO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to