This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c899956b2d0 [federation] Introducing multi-cluster routing support for 
physical optimizer (#17516)
c899956b2d0 is described below

commit c899956b2d084dcad7aa4e7cd5515edb69f0ac9d
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Jan 16 14:26:26 2026 -0800

    [federation] Introducing multi-cluster routing support for physical 
optimizer (#17516)
    
    * [federation] Introducing multi-cluster routing support for physical 
optimizer
    
    * Renamed RunInBroker to MSELite in tests
---
 .../multicluster/MultiClusterIntegrationTest.java  | 107 ++++++++++++---------
 .../org/apache/pinot/query/QueryEnvironment.java   |   3 +-
 .../query/context/PhysicalPlannerContext.java      |  15 ++-
 .../opt/rules/LeafStageWorkerAssignmentRule.java   |   4 +-
 .../v2/validation/LiteModeJoinValidationTest.java  |   3 +-
 5 files changed, 79 insertions(+), 53 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index dc434cfd75a..ad2cdbc72a3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -65,6 +65,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeGroups;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -200,36 +201,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     LOGGER.info("Multi-cluster broker test passed: both clusters started and 
queryable");
   }
 
-  @Test(dataProvider = "brokerModes")
-  public void testLogicalFederationTwoOfflineTablesSSE(int brokerPort, boolean 
expectUnavailableException)
-      throws Exception {
-    LOGGER.info("Testing SSE on broker port {} 
(expectUnavailableException={})",
-        brokerPort, expectUnavailableException);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster1._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster2._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster1._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster2._controllerBaseApiUrl);
-    setupFirstLogicalFederatedTable();
-    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
-        LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
-    cleanSegmentDirs();
-    _cluster1AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_1, 1);
-    _cluster2AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_2, 2);
-    loadDataIntoCluster(_cluster1AvroFiles, 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
-    loadDataIntoCluster(_cluster2AvroFiles, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
-    long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
-
-    String query = "SET enableMultiClusterRouting=true; SELECT COUNT(*) as 
count FROM " + LOGICAL_TABLE_NAME;
-    String result = executeQueryOnBrokerPort(query, brokerPort);
-    assertEquals(parseCountResult(result), expectedTotal);
-    verifyUnavailableClusterException(result, expectUnavailableException);
-  }
-
-  @Test(dataProvider = "brokerModes")
-  public void testLogicalFederationTwoLogicalTablesMSE(int brokerPort, boolean 
expectUnavailableException)
-      throws Exception {
-    LOGGER.info("Testing MSE on broker port {} 
(expectUnavailableException={})",
-        brokerPort, expectUnavailableException);
+  @BeforeGroups("query")
+  public void setupTablesForQueryTests() throws Exception {
     dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster1._controllerBaseApiUrl);
     dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster2._controllerBaseApiUrl);
     dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster1._controllerBaseApiUrl);
@@ -237,35 +210,73 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     setupFirstLogicalFederatedTable();
     setupSecondLogicalFederatedTable();
     createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
-        LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+      LOGICAL_FEDERATION_CLUSTER_1_TABLE, LOGICAL_FEDERATION_CLUSTER_2_TABLE);
     createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
-        LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
+      LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
     cleanSegmentDirs();
     loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1), 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
     loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2), 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
     loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1, 
1, SEGMENTS_PER_CLUSTER),
-        LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
+      LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
     loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2, 
2, SEGMENTS_PER_CLUSTER),
-        LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
-    String joinQuery = "SET useMultistageEngine=true; SET 
enableMultiClusterRouting=true; "
-        + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME + " t1 "
-        + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " = 
t2." + JOIN_COLUMN + " "
-        + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
-    String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
-    assertNotNull(result);
-    assertTrue(result.contains("resultTable"));
-    assertResultRows(result);
-    verifyUnavailableClusterException(result, expectUnavailableException);
+      LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
+  }
+
+  @Test(dataProvider = "queryModes", groups = "query")
+  public void testLogicalFederationQueries(String testName, String 
queryOptions, boolean isJoinQuery,
+      int brokerPort, boolean expectUnavailableException)
+      throws Exception {
+    LOGGER.info("Running {} on broker port {} (expectUnavailableException={})",
+        testName, brokerPort, expectUnavailableException);
+    long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
+
+    if (isJoinQuery) {
+      // Join query test
+      String joinQuery = queryOptions
+          + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME + " t1 "
+          + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " = 
t2." + JOIN_COLUMN + " "
+          + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
+      String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
+      assertNotNull(result);
+      assertTrue(result.contains("resultTable"), "Expected resultTable in 
response: " + result);
+      assertResultRows(result);
+      verifyUnavailableClusterException(result, expectUnavailableException);
+    }
+
+    // Count query test (all modes)
+    String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME;
+    String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
+    assertEquals(parseCountResult(countResult), expectedTotal);
+    verifyUnavailableClusterException(countResult, expectUnavailableException);
   }
 
   /**
-   * Data provider for broker modes: normal broker vs broker with unavailable 
remote cluster.
+   * Data provider for all query mode combinations: broker mode x query 
engine/options.
+   * Each test case has: testName, queryOptions, isJoinQuery, brokerPort, 
expectUnavailableException
    */
-  @DataProvider(name = "brokerModes")
-  public Object[][] brokerModes() {
+  @DataProvider(name = "queryModes")
+  public Object[][] queryModes() {
+    int normalBroker = _cluster1._brokerPort;
+    int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
+
+    String sseOpts = "SET enableMultiClusterRouting=true; ";
+    String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
+    String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
+    String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
+
     return new Object[][]{
-        {_cluster1._brokerPort, false},  // Normal broker - all clusters 
connected
-        {_brokerWithUnavailableCluster._brokerPort, true}  // Broker with 
unavailable cluster
+        // SSE tests (count only)
+        {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
+        {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
+        // MSE tests (join + count)
+        {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
+        {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
+        // Physical optimizer tests (join + count)
+        {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker, 
false},
+        {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true, 
unavailableBroker, true},
+        // MSELiteMode tests (join + count)
+        {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
+        {"MSELiteMode-UnavailableBroker", mseLiteOpts, true, 
unavailableBroker, true},
     };
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 9068de7a5ef..7535cdcbca0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -230,7 +230,8 @@ public class QueryEnvironment {
           workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
           _envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(), 
_envConfig.defaultUseBrokerPruning(),
           _envConfig.defaultLiteModeLeafStageLimit(), 
_envConfig.defaultHashFunction(),
-          _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit(), 
_envConfig.defaultLiteModeEnableJoins());
+          _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit(), 
_envConfig.defaultLiteModeEnableJoins(),
+          _multiClusterRoutingContext);
     }
     return new PlannerContext(_config, _catalogReader, _typeFactory, 
optProgram, traitProgram,
         sqlNodeAndOptions.getOptions(), _envConfig, format, 
physicalPlannerContext);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 3a7fafe9b53..816190ae719 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
@@ -71,6 +72,8 @@ public class PhysicalPlannerContext {
   private final int _liteModeLeafStageFanOutAdjustedLimit;
   private final DistHashFunction _defaultHashFunction;
   private final boolean _liteModeJoinsEnabled;
+  @Nullable
+  private final MultiClusterRoutingContext _multiClusterRoutingContext;
 
   /**
    * Used by controller when it needs to extract table names from the query.
@@ -90,6 +93,7 @@ public class PhysicalPlannerContext {
     _liteModeLeafStageFanOutAdjustedLimit = 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT;
     _defaultHashFunction = 
DistHashFunction.valueOf(KeySelector.DEFAULT_HASH_ALGORITHM.toUpperCase());
     _liteModeJoinsEnabled = 
CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
+    _multiClusterRoutingContext = null;
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
@@ -98,13 +102,14 @@ public class PhysicalPlannerContext {
         CommonConstants.Broker.DEFAULT_USE_LITE_MODE, 
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
         CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
         KeySelector.DEFAULT_HASH_ALGORITHM, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
-        CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS);
+        CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS, null);
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
       String instanceId, Map<String, String> queryOptions, boolean 
defaultUseLiteMode, boolean defaultRunInBroker,
       boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit, 
String defaultHashFunction,
-      int defaultLiteModeLeafStageFanOutAdjustedLimit, boolean 
defaultLiteModeEnableJoins) {
+      int defaultLiteModeLeafStageFanOutAdjustedLimit, boolean 
defaultLiteModeEnableJoins,
+      @Nullable MultiClusterRoutingContext multiClusterRoutingContext) {
     _routingManager = routingManager;
     _hostName = hostName;
     _port = port;
@@ -121,6 +126,7 @@ public class PhysicalPlannerContext {
     _defaultHashFunction = 
DistHashFunction.valueOf(defaultHashFunction.toUpperCase());
     _instanceIdToQueryServerInstance.put(instanceId, 
getBrokerQueryServerInstance());
     _liteModeJoinsEnabled = defaultLiteModeEnableJoins;
+    _multiClusterRoutingContext = multiClusterRoutingContext;
   }
 
   public Supplier<Integer> getNodeIdGenerator() {
@@ -202,6 +208,11 @@ public class PhysicalPlannerContext {
     return _defaultHashFunction;
   }
 
+  @Nullable
+  public MultiClusterRoutingContext getMultiClusterRoutingContext() {
+    return _multiClusterRoutingContext;
+  }
+
   private QueryServerInstance getBrokerQueryServerInstance() {
     return new QueryServerInstance(_instanceId, _hostName, _port, _port);
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 36ec3cc76b6..d0ae144ae5e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -584,7 +584,9 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     Map<String, String> tableOptions = getTableOptions(tableScan.getHints());
 
     // Step 1: Get LogicalTableRouteInfo using LogicalTableRouteProvider
-    LogicalTableRouteProvider tableRouteProvider = new 
LogicalTableRouteProvider();
+    // Use MultiClusterRoutingContext if available to support logical tables 
with physical tables from remote clusters
+    LogicalTableRouteProvider tableRouteProvider =
+        new 
LogicalTableRouteProvider(_physicalPlannerContext.getMultiClusterRoutingContext());
     LogicalTableRouteInfo logicalTableRouteInfo = new LogicalTableRouteInfo();
     tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo, 
logicalTableName, _tableCache);
     tableRouteProvider.fillRouteMetadata(logicalTableRouteInfo, 
_routingManager);
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
index 698413c39fe..a9cb7ec12d6 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
@@ -49,7 +49,8 @@ public class LiteModeJoinValidationTest {
         CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
         KeySelector.DEFAULT_HASH_ALGORITHM,
         
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
-        liteModeJoinsEnabled);
+        liteModeJoinsEnabled,
+        null);
   }
 
   private static PRelNode makeJoinPlan() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to