This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c899956b2d0 [federation] Introducing multi-cluster routing support for
physical optimizer (#17516)
c899956b2d0 is described below
commit c899956b2d084dcad7aa4e7cd5515edb69f0ac9d
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Jan 16 14:26:26 2026 -0800
[federation] Introducing multi-cluster routing support for physical
optimizer (#17516)
* [federation] Introducing multi-cluster routing support for physical
optimizer
* Renamed RunInBroker to MSELite in tests
---
.../multicluster/MultiClusterIntegrationTest.java | 107 ++++++++++++---------
.../org/apache/pinot/query/QueryEnvironment.java | 3 +-
.../query/context/PhysicalPlannerContext.java | 15 ++-
.../opt/rules/LeafStageWorkerAssignmentRule.java | 4 +-
.../v2/validation/LiteModeJoinValidationTest.java | 3 +-
5 files changed, 79 insertions(+), 53 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index dc434cfd75a..ad2cdbc72a3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -65,6 +65,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeGroups;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -200,36 +201,8 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
LOGGER.info("Multi-cluster broker test passed: both clusters started and
queryable");
}
- @Test(dataProvider = "brokerModes")
- public void testLogicalFederationTwoOfflineTablesSSE(int brokerPort, boolean
expectUnavailableException)
- throws Exception {
- LOGGER.info("Testing SSE on broker port {}
(expectUnavailableException={})",
- brokerPort, expectUnavailableException);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster1._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster2._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster1._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster2._controllerBaseApiUrl);
- setupFirstLogicalFederatedTable();
- createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE,
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
- cleanSegmentDirs();
- _cluster1AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_1, 1);
- _cluster2AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_2, 2);
- loadDataIntoCluster(_cluster1AvroFiles,
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
- loadDataIntoCluster(_cluster2AvroFiles,
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
- long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
-
- String query = "SET enableMultiClusterRouting=true; SELECT COUNT(*) as
count FROM " + LOGICAL_TABLE_NAME;
- String result = executeQueryOnBrokerPort(query, brokerPort);
- assertEquals(parseCountResult(result), expectedTotal);
- verifyUnavailableClusterException(result, expectUnavailableException);
- }
-
- @Test(dataProvider = "brokerModes")
- public void testLogicalFederationTwoLogicalTablesMSE(int brokerPort, boolean
expectUnavailableException)
- throws Exception {
- LOGGER.info("Testing MSE on broker port {}
(expectUnavailableException={})",
- brokerPort, expectUnavailableException);
+ @BeforeGroups("query")
+ public void setupTablesForQueryTests() throws Exception {
dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster1._controllerBaseApiUrl);
dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster2._controllerBaseApiUrl);
dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster1._controllerBaseApiUrl);
@@ -237,35 +210,73 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
setupFirstLogicalFederatedTable();
setupSecondLogicalFederatedTable();
createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE,
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
+ LOGICAL_FEDERATION_CLUSTER_1_TABLE, LOGICAL_FEDERATION_CLUSTER_2_TABLE);
createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE_2,
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
+ LOGICAL_FEDERATION_CLUSTER_1_TABLE_2,
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
cleanSegmentDirs();
loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1),
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2),
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1,
1, SEGMENTS_PER_CLUSTER),
- LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
+ LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2,
2, SEGMENTS_PER_CLUSTER),
- LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
- String joinQuery = "SET useMultistageEngine=true; SET
enableMultiClusterRouting=true; "
- + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME + " t1 "
- + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " =
t2." + JOIN_COLUMN + " "
- + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
- String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
- assertNotNull(result);
- assertTrue(result.contains("resultTable"));
- assertResultRows(result);
- verifyUnavailableClusterException(result, expectUnavailableException);
+ LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
+ }
+
+ @Test(dataProvider = "queryModes", groups = "query")
+ public void testLogicalFederationQueries(String testName, String
queryOptions, boolean isJoinQuery,
+ int brokerPort, boolean expectUnavailableException)
+ throws Exception {
+ LOGGER.info("Running {} on broker port {} (expectUnavailableException={})",
+ testName, brokerPort, expectUnavailableException);
+ long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
+
+ if (isJoinQuery) {
+ // Join query test
+ String joinQuery = queryOptions
+ + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME + " t1 "
+ + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " =
t2." + JOIN_COLUMN + " "
+ + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
+ String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
+ assertNotNull(result);
+ assertTrue(result.contains("resultTable"), "Expected resultTable in
response: " + result);
+ assertResultRows(result);
+ verifyUnavailableClusterException(result, expectUnavailableException);
+ }
+
+ // Count query test (all modes)
+ String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME;
+ String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
+ assertEquals(parseCountResult(countResult), expectedTotal);
+ verifyUnavailableClusterException(countResult, expectUnavailableException);
}
/**
- * Data provider for broker modes: normal broker vs broker with unavailable
remote cluster.
+ * Data provider for all query mode combinations: broker mode x query
engine/options.
+ * Each test case has: testName, queryOptions, isJoinQuery, brokerPort,
expectUnavailableException
*/
- @DataProvider(name = "brokerModes")
- public Object[][] brokerModes() {
+ @DataProvider(name = "queryModes")
+ public Object[][] queryModes() {
+ int normalBroker = _cluster1._brokerPort;
+ int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
+
+ String sseOpts = "SET enableMultiClusterRouting=true; ";
+ String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
+ String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
+ String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
+
return new Object[][]{
- {_cluster1._brokerPort, false}, // Normal broker - all clusters
connected
- {_brokerWithUnavailableCluster._brokerPort, true} // Broker with
unavailable cluster
+ // SSE tests (count only)
+ {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
+ {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
+ // MSE tests (join + count)
+ {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
+ {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
+ // Physical optimizer tests (join + count)
+ {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker,
false},
+ {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true,
unavailableBroker, true},
+ // MSELiteMode tests (join + count)
+ {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
+ {"MSELiteMode-UnavailableBroker", mseLiteOpts, true,
unavailableBroker, true},
};
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 9068de7a5ef..7535cdcbca0 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -230,7 +230,8 @@ public class QueryEnvironment {
workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
_envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(),
_envConfig.defaultUseBrokerPruning(),
_envConfig.defaultLiteModeLeafStageLimit(),
_envConfig.defaultHashFunction(),
- _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit(),
_envConfig.defaultLiteModeEnableJoins());
+ _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit(),
_envConfig.defaultLiteModeEnableJoins(),
+ _multiClusterRoutingContext);
}
return new PlannerContext(_config, _catalogReader, _typeFactory,
optProgram, traitProgram,
sqlNodeAndOptions.getOptions(), _envConfig, format,
physicalPlannerContext);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 3a7fafe9b53..816190ae719 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
@@ -71,6 +72,8 @@ public class PhysicalPlannerContext {
private final int _liteModeLeafStageFanOutAdjustedLimit;
private final DistHashFunction _defaultHashFunction;
private final boolean _liteModeJoinsEnabled;
+ @Nullable
+ private final MultiClusterRoutingContext _multiClusterRoutingContext;
/**
* Used by controller when it needs to extract table names from the query.
@@ -90,6 +93,7 @@ public class PhysicalPlannerContext {
_liteModeLeafStageFanOutAdjustedLimit =
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT;
_defaultHashFunction =
DistHashFunction.valueOf(KeySelector.DEFAULT_HASH_ALGORITHM.toUpperCase());
_liteModeJoinsEnabled =
CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
+ _multiClusterRoutingContext = null;
}
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
@@ -98,13 +102,14 @@ public class PhysicalPlannerContext {
CommonConstants.Broker.DEFAULT_USE_LITE_MODE,
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
KeySelector.DEFAULT_HASH_ALGORITHM,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
- CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS);
+ CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS, null);
}
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions, boolean
defaultUseLiteMode, boolean defaultRunInBroker,
boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit,
String defaultHashFunction,
- int defaultLiteModeLeafStageFanOutAdjustedLimit, boolean
defaultLiteModeEnableJoins) {
+ int defaultLiteModeLeafStageFanOutAdjustedLimit, boolean
defaultLiteModeEnableJoins,
+ @Nullable MultiClusterRoutingContext multiClusterRoutingContext) {
_routingManager = routingManager;
_hostName = hostName;
_port = port;
@@ -121,6 +126,7 @@ public class PhysicalPlannerContext {
_defaultHashFunction =
DistHashFunction.valueOf(defaultHashFunction.toUpperCase());
_instanceIdToQueryServerInstance.put(instanceId,
getBrokerQueryServerInstance());
_liteModeJoinsEnabled = defaultLiteModeEnableJoins;
+ _multiClusterRoutingContext = multiClusterRoutingContext;
}
public Supplier<Integer> getNodeIdGenerator() {
@@ -202,6 +208,11 @@ public class PhysicalPlannerContext {
return _defaultHashFunction;
}
+ @Nullable
+ public MultiClusterRoutingContext getMultiClusterRoutingContext() {
+ return _multiClusterRoutingContext;
+ }
+
private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 36ec3cc76b6..d0ae144ae5e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -584,7 +584,9 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
Map<String, String> tableOptions = getTableOptions(tableScan.getHints());
// Step 1: Get LogicalTableRouteInfo using LogicalTableRouteProvider
- LogicalTableRouteProvider tableRouteProvider = new
LogicalTableRouteProvider();
+ // Use MultiClusterRoutingContext if available to support logical tables
with physical tables from remote clusters
+ LogicalTableRouteProvider tableRouteProvider =
+ new
LogicalTableRouteProvider(_physicalPlannerContext.getMultiClusterRoutingContext());
LogicalTableRouteInfo logicalTableRouteInfo = new LogicalTableRouteInfo();
tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo,
logicalTableName, _tableCache);
tableRouteProvider.fillRouteMetadata(logicalTableRouteInfo,
_routingManager);
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
index 698413c39fe..a9cb7ec12d6 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
@@ -49,7 +49,8 @@ public class LiteModeJoinValidationTest {
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
KeySelector.DEFAULT_HASH_ALGORITHM,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
- liteModeJoinsEnabled);
+ liteModeJoinsEnabled,
+ null);
}
private static PRelNode makeJoinPlan() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]