This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf07fea14c7 [federation] Add multi-cluster routing support for MSE 
queries (#17444)
cf07fea14c7 is described below

commit cf07fea14c784f4b60ae6932d91fa23abc963be8
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Jan 5 13:52:55 2026 -0800

    [federation] Add multi-cluster routing support for MSE queries (#17444)
---
 .../MultiStageBrokerRequestHandler.java            | 16 ++++-
 .../core/routing/MultiClusterRoutingContext.java   |  4 ++
 .../multicluster/MultiClusterIntegrationTest.java  | 84 ++++++++++++++++++----
 .../org/apache/pinot/query/QueryEnvironment.java   |  9 ++-
 .../planner/physical/DispatchablePlanVisitor.java  |  7 +-
 .../planner/physical/PinotDispatchPlanner.java     |  5 +-
 6 files changed, 105 insertions(+), 20 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 710ba7cccb6..69f3441529b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -129,6 +129,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   private static final int NUM_UNAVAILABLE_SEGMENTS_TO_LOG = 10;
 
   private final WorkerManager _workerManager;
+  private final WorkerManager _multiClusterWorkerManager;
   private final QueryDispatcher _queryDispatcher;
   private final boolean _explainAskingServerDefault;
   private final MultiStageQueryThrottler _queryThrottler;
@@ -146,7 +147,16 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         threadAccountant, multiClusterRoutingContext);
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     int port = 
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
+
     _workerManager = new WorkerManager(_brokerId, hostname, port, 
_routingManager);
+    if (multiClusterRoutingContext != null) {
+      _multiClusterWorkerManager = new WorkerManager(_brokerId, hostname, port,
+          multiClusterRoutingContext.getMultiClusterRoutingManager());
+    } else {
+      // if multi-cluster routing is not enabled, use the same worker manager.
+      _multiClusterWorkerManager = _workerManager;
+    }
+
     TlsConfig tlsConfig = 
config.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
         CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? 
TlsUtils.extractTlsConfig(config,
         CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
@@ -373,7 +383,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     try {
       ImmutableQueryEnvironment.Config queryEnvConf = 
getQueryEnvConf(httpHeaders, queryOptions, requestId);
-      QueryEnvironment queryEnv = new QueryEnvironment(queryEnvConf);
+      QueryEnvironment queryEnv = new QueryEnvironment(queryEnvConf, 
_multiClusterRoutingContext);
       return callAsync(requestId, query, () -> queryEnv.compile(query, 
sqlNodeAndOptions), queryTimer);
     } catch (WebApplicationException e) {
       throw e;
@@ -461,11 +471,13 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     int sortExchangeCopyThreshold = _config.getProperty(
         CommonConstants.Broker.CONFIG_OF_SORT_EXCHANGE_COPY_THRESHOLD,
         CommonConstants.Broker.DEFAULT_SORT_EXCHANGE_COPY_THRESHOLD);
+    WorkerManager workerManager = 
QueryOptionsUtils.isMultiClusterRoutingEnabled(queryOptions, false)
+        ? _multiClusterWorkerManager : _workerManager;
     return QueryEnvironment.configBuilder()
         .requestId(requestId)
         .database(database)
         .tableCache(_tableCache)
-        .workerManager(_workerManager)
+        .workerManager(workerManager)
         .isCaseSensitive(caseSensitive)
         
.isNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(queryOptions))
         .defaultInferPartitionHint(inferPartitionHint)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
index 251e84ee048..c40f3aee6f5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java
@@ -77,4 +77,8 @@ public class MultiClusterRoutingContext {
     }
     return _localRoutingManager;
   }
+
+  public RoutingManager getMultiClusterRoutingManager() {
+    return _multiClusterRoutingManager;
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index e2857348a85..7173e6f6008 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -86,8 +86,12 @@ public class MultiClusterIntegrationTest extends ClusterTest 
{
   protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
   protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE = 
"logical_federation_table_cluster1";
   protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE = 
"logical_federation_table_cluster2";
-  protected static final int CLUSTER_1_SIZE = 1500;
-  protected static final int CLUSTER_2_SIZE = 1000;
+  protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE_2 = 
"logical_federation_table2_cluster1";
+  protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE_2 = 
"logical_federation_table2_cluster2";
+  protected static final int TABLE_SIZE_CLUSTER_1 = 1500;
+  protected static final int TABLE_SIZE_CLUSTER_2 = 1000;
+  protected static final int SEGMENTS_PER_CLUSTER = 3;
+  protected static final String JOIN_COLUMN = "OriginCityName";
 
   protected ClusterComponents _cluster1;
   protected ClusterComponents _cluster2;
@@ -139,8 +143,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     createSchemaAndTableOnBothClusters(testTableName);
 
     // Create and load test data into both clusters
-    _cluster1AvroFiles = createAvroData(CLUSTER_1_SIZE, 1);
-    _cluster2AvroFiles = createAvroData(CLUSTER_2_SIZE, 2);
+    _cluster1AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_1, 1);
+    _cluster2AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_2, 2);
 
     loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
     loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
@@ -150,13 +154,13 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     String result1 = executeQuery(query, _cluster1);
     assertNotNull(result1, "Query result from cluster 1 should not be null");
     long count1 = parseCountResult(result1);
-    assertEquals(count1, CLUSTER_1_SIZE);
+    assertEquals(count1, TABLE_SIZE_CLUSTER_1);
 
     // Verify cluster 2 is queryable
     String result2 = executeQuery(query, _cluster2);
     assertNotNull(result2, "Query result from cluster 2 should not be null");
     long count2 = parseCountResult(result2);
-    assertEquals(count2, CLUSTER_2_SIZE);
+    assertEquals(count2, TABLE_SIZE_CLUSTER_2);
 
     LOGGER.info("Multi-cluster broker test passed: both clusters started and 
queryable");
   }
@@ -171,15 +175,44 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
         LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
     cleanSegmentDirs();
-    _cluster1AvroFiles = createAvroData(CLUSTER_1_SIZE, 1);
-    _cluster2AvroFiles = createAvroData(CLUSTER_2_SIZE, 2);
+    _cluster1AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_1, 1);
+    _cluster2AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_2, 2);
     loadDataIntoCluster(_cluster1AvroFiles, 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
     loadDataIntoCluster(_cluster2AvroFiles, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
-    long expectedTotal = CLUSTER_1_SIZE + CLUSTER_2_SIZE;
+    long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
     assertEquals(getCount(LOGICAL_TABLE_NAME, _cluster1, true), expectedTotal);
     assertEquals(getCount(LOGICAL_TABLE_NAME, _cluster2, true), expectedTotal);
   }
 
+  @Test
+  public void testLogicalFederationTwoLogicalTablesMSE() throws Exception {
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster1._controllerBaseApiUrl);
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster2._controllerBaseApiUrl);
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster1._controllerBaseApiUrl);
+    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster2._controllerBaseApiUrl);
+    setupFirstLogicalFederatedTable();
+    setupSecondLogicalFederatedTable();
+    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
+        LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
+        LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
+    cleanSegmentDirs();
+    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1), 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
+    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2), 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
+    loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1, 
1, SEGMENTS_PER_CLUSTER),
+        LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
+    loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2, 
2, SEGMENTS_PER_CLUSTER),
+        LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
+    String joinQuery = "SET useMultistageEngine=true; SET 
enableMultiClusterRouting=true; "
+        + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME + " t1 "
+        + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " = 
t2." + JOIN_COLUMN + " "
+        + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
+    String result = executeQuery(joinQuery, _cluster1);
+    assertNotNull(result);
+    assertTrue(result.contains("resultTable"));
+    assertResultRows(result);
+  }
+
   @Override
   protected BaseBrokerStarter createBrokerStarter() {
     return new MultiClusterHelixBrokerStarter();
@@ -557,14 +590,21 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
 
   protected void createLogicalTableOnBothClusters(String logicalTableName,
       String cluster1PhysicalTable, String cluster2PhysicalTable) throws 
IOException {
-    Map<String, PhysicalTableConfig> physicalTableConfigMap = Map.of(
-        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
+    // For cluster 1: cluster1's table is local (isMultiCluster=false), 
cluster2's table is remote (isMultiCluster=true)
+    Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap = Map.of(
+        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false),
         cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
     );
 
-    createLogicalTable(SCHEMA_FILE, physicalTableConfigMap, DEFAULT_TENANT,
+    // For cluster 2: cluster2's table is local (isMultiCluster=false), 
cluster1's table is remote (isMultiCluster=true)
+    Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap = Map.of(
+        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
+        cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false)
+    );
+
+    createLogicalTable(SCHEMA_FILE, cluster1PhysicalTableConfigMap, 
DEFAULT_TENANT,
         _cluster1._controllerBaseApiUrl, logicalTableName, 
cluster1PhysicalTable + "_OFFLINE", null);
-    createLogicalTable(SCHEMA_FILE, physicalTableConfigMap, DEFAULT_TENANT,
+    createLogicalTable(SCHEMA_FILE, cluster2PhysicalTableConfigMap, 
DEFAULT_TENANT,
         _cluster2._controllerBaseApiUrl, logicalTableName, 
cluster2PhysicalTable + "_OFFLINE", null);
   }
 
@@ -576,10 +616,28 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
   }
 
+  protected void setupSecondLogicalFederatedTable() throws Exception {
+    setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
+  }
+
   protected void setupLogicalFederatedTable(String cluster1TableName, String 
cluster2TableName) throws Exception {
     dropTableAndSchemaIfExists(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
     dropTableAndSchemaIfExists(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
     createSchemaAndTableForCluster(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
     createSchemaAndTableForCluster(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
   }
+
+  protected void assertResultRows(String resultJson) throws Exception {
+    JsonNode rows = 
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
+    assertNotNull(rows);
+    for (JsonNode row : rows) {
+      int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
+      // Depending on the number of records with the same join key in each 
cluster, the expected count varies.
+      // If the number is less than the size of the smaller cluster, it should 
appear in both clusters,
+      // resulting in 4 records (2 from each cluster).
+      // Otherwise, it should appear only in one cluster, resulting in 1 
record.
+      int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1, 
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
+      assertEquals(row.get(1).asInt(), expectedCount);
+    }
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 77ba60e792c..9068de7a5ef 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -69,6 +69,7 @@ import org.apache.pinot.calcite.sql2rel.PinotConvertletTable;
 import org.apache.pinot.common.catalog.PinotCatalogReader;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.context.PlannerContext;
@@ -146,8 +147,13 @@ public class QueryEnvironment {
   private final Config _envConfig;
   private final PinotCatalog _catalog;
   private final Set<String> _defaultDisabledPlannerRules;
+  private final MultiClusterRoutingContext _multiClusterRoutingContext;
 
   public QueryEnvironment(Config config) {
+    this(config, null);
+  }
+
+  public QueryEnvironment(Config config, MultiClusterRoutingContext 
multiClusterRoutingContext) {
     _envConfig = config;
     String database = config.getDatabase();
     _catalog = new PinotCatalog(config.getTableCache(), database);
@@ -163,6 +169,7 @@ public class QueryEnvironment {
     _defaultDisabledPlannerRules = _envConfig.defaultDisabledPlannerRules();
     // default optProgram with no skip rule options and no use rule options
     _optProgram = getOptProgram(Set.of(), Set.of(), 
_defaultDisabledPlannerRules);
+    _multiClusterRoutingContext = multiClusterRoutingContext;
   }
 
   public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
@@ -509,7 +516,7 @@ public class QueryEnvironment {
     PinotDispatchPlanner pinotDispatchPlanner =
         new PinotDispatchPlanner(plannerContext, 
_envConfig.getWorkerManager(), _envConfig.getRequestId(),
             _envConfig.getTableCache());
-    return pinotDispatchPlanner.createDispatchableSubPlan(plan);
+    return pinotDispatchPlanner.createDispatchableSubPlan(plan, 
_multiClusterRoutingContext);
   }
 
   // --------------------------------------------------------------------------
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index aff6e0e8e9c..e01e80eab05 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -26,6 +26,7 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.core.routing.LogicalTableRouteInfo;
 import org.apache.pinot.core.routing.LogicalTableRouteProvider;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.EnrichedJoinNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -48,9 +49,11 @@ import org.apache.pinot.query.planner.plannode.WindowNode;
 public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, 
DispatchablePlanContext> {
   private final Set<MailboxSendNode> _visited = Collections.newSetFromMap(new 
IdentityHashMap<>());
   private final TableCache _tableCache;
+  private final MultiClusterRoutingContext _multiClusterRoutingContext;
 
-  public DispatchablePlanVisitor(TableCache tableCache) {
+  public DispatchablePlanVisitor(TableCache tableCache, 
MultiClusterRoutingContext multiClusterRoutingContext) {
     _tableCache = tableCache;
+    _multiClusterRoutingContext = multiClusterRoutingContext;
   }
 
   private static DispatchablePlanMetadata 
getOrCreateDispatchablePlanMetadata(PlanNode node,
@@ -156,7 +159,7 @@ public class DispatchablePlanVisitor implements 
PlanNodeVisitor<Void, Dispatchab
     if (tableName == null) {
       tableName = _tableCache.getActualLogicalTableName(tableNameInNode);
       Preconditions.checkNotNull(tableName, "Logical table config not found in 
table cache: " + tableNameInNode);
-      LogicalTableRouteProvider tableRouteProvider = new 
LogicalTableRouteProvider();
+      LogicalTableRouteProvider tableRouteProvider = new 
LogicalTableRouteProvider(_multiClusterRoutingContext);
       LogicalTableRouteInfo logicalTableRouteInfo = new 
LogicalTableRouteInfo();
       tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo, 
tableName, _tableCache);
       dispatchablePlanMetadata.setLogicalTableRouteInfo(logicalTableRouteInfo);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index 656d0dc5b93..6cb3e51bf6c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.SubPlan;
@@ -52,7 +53,7 @@ public class PinotDispatchPlanner {
    * Entry point for attaching dispatch metadata to a {@link SubPlan}.
    * @param subPlan the entrypoint of the sub plan.
    */
-  public DispatchableSubPlan createDispatchableSubPlan(SubPlan subPlan) {
+  public DispatchableSubPlan createDispatchableSubPlan(SubPlan subPlan, 
MultiClusterRoutingContext routingContext) {
     // perform physical plan conversion and assign workers to each stage.
     // metadata may come directly from Calcite's RelNode which has not 
resolved actual table names (taking
     // case-sensitivity into account) yet, so we need to ensure table names 
are resolved while creating the subplan.
@@ -62,7 +63,7 @@ public class PinotDispatchPlanner {
     PlanFragment rootFragment = subPlan.getSubPlanRoot();
     PlanNode rootNode = rootFragment.getFragmentRoot();
     // 1. start by visiting the sub plan fragment root.
-    rootNode.visit(new DispatchablePlanVisitor(_tableCache), context);
+    rootNode.visit(new DispatchablePlanVisitor(_tableCache, routingContext), 
context);
     // 2. add a special stage for the global mailbox receive, this runs on the 
dispatcher.
     context.getDispatchablePlanStageRootMap().put(0, rootNode);
     // 3. add worker assignment after the dispatchable plan context is 
fulfilled after the visit.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to