This is an automated email from the ASF dual-hosted git repository.
shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b396056caf9 [federation] Disallow multi-cluster routing for physical
tables (#17731)
b396056caf9 is described below
commit b396056caf9cdd5c2a32b8790a46641908d73247
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Feb 25 10:46:17 2026 -0800
[federation] Disallow multi-cluster routing for physical tables (#17731)
This PR enforces validation to prevent physical tables from being queried
with enableMultiClusterRouting=true, since multi-cluster federation is
supported only for logical tables. If a physical table is queried with
multi-cluster routing enabled, the broker now returns a QUERY_VALIDATION error
with a clear message.
---
.../requesthandler/BaseBrokerRequestHandler.java | 29 +
.../BaseSingleStageBrokerRequestHandler.java | 13 +
.../MultiStageBrokerRequestHandler.java | 1 +
...t.java => BaseMultiClusterIntegrationTest.java} | 393 +++++------
.../multicluster/MultiClusterIntegrationTest.java | 776 +++++----------------
.../SameTableNameMultiClusterIntegrationTest.java | 42 ++
6 files changed, 438 insertions(+), 816 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5b18088ccac..30d09fa2360 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -63,6 +63,7 @@ import
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -261,6 +262,34 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
return tableAuthorizationResult;
}
+ /**
+ * Validates that tables can be queried with enableMultiClusterRouting if
and only if they are logical tables.
+ * Physical tables are cluster-specific and cannot be federated across
clusters.
+ * Multi-cluster routing is only supported for logical tables.
+ *
+ * @param tableNames Set of table names to validate
+ * @param queryOptions Map of query options
+ * @throws QueryException if any physical table is queried with
enableMultiClusterRouting=true
+ */
+ protected void validatePhysicalTablesWithMultiClusterRouting(Set<String>
tableNames,
+ Map<String, String> queryOptions) {
+ Preconditions.checkNotNull(tableNames, "Table names cannot be null when
validating multi-cluster routing");
+ Preconditions.checkNotNull(queryOptions, "Query options cannot be null");
+ boolean isMultiClusterRoutingEnabled =
queryOptions.containsKey(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING)
+ &&
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING));
+
+ if (isMultiClusterRoutingEnabled) {
+ for (String tableName : tableNames) {
+ if (!_tableCache.isLogicalTable(tableName)) {
+ throw QueryErrorCode.QUERY_VALIDATION.asException(
+ "Physical table '" + tableName + "' cannot be queried with
enableMultiClusterRouting=true. "
+ + "Multi-cluster routing is only supported for logical tables. "
+ + "Please remove the enableMultiClusterRouting query option or
use a logical table instead.");
+ }
+ }
+ }
+ }
+
/**
* Returns true if the QPS quota of query tables, database or application
has been exceeded.
*/
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 62ddffa613b..9431ea311d3 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -115,6 +115,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.QueryFingerprint;
@@ -430,6 +431,18 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
LogicalTableConfig logicalTableConfig =
_tableCache.getLogicalTableConfig(rawTableName);
String database =
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
long compilationEndTimeNs = System.nanoTime();
+
+ // Validate that physical tables are not queried with multi-cluster
routing enabled.
+ // Unlike the MSE, the SSE has no centralized exception handler that
converts QueryException into
+ // BrokerResponseNative, so we must catch and convert here to return a
proper JSON error response.
+ try {
+ validatePhysicalTablesWithMultiClusterRouting(Set.of(tableName),
sqlNodeAndOptions.getOptions());
+ } catch (QueryException e) {
+ LOGGER.warn("Request {}: {}", requestId, e.getMessage());
+ requestContext.setErrorCode(e.getErrorCode());
+ return new BrokerResponseNative(e.getErrorCode(), e.getMessage());
+ }
+
// full request compile time = compilationTimeNs + parserTimeNs
_brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.REQUEST_COMPILATION,
(compilationEndTimeNs - compilationStartTimeNs) +
sqlNodeAndOptions.getParseTimeNs());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f589d90ee96..8d8afe0598a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -358,6 +358,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
try (QueryThreadContext ignore = QueryThreadContext.open(executionContext,
mseWorkerInfo, _threadAccountant);
QueryEnvironment.CompiledQuery compiledQuery = compileQuery(requestId,
query, sqlNodeAndOptions, requestContext,
httpHeaders, queryTimer)) {
+
validatePhysicalTablesWithMultiClusterRouting(compiledQuery.getTableNames(),
compiledQuery.getOptions());
AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
checkAuthorization(requesterIdentity, requestContext, httpHeaders,
compiledQuery, rlsFiltersApplied);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/BaseMultiClusterIntegrationTest.java
similarity index 73%
copy from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
copy to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/BaseMultiClusterIntegrationTest.java
index ad2cdbc72a3..ac24426a058 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/BaseMultiClusterIntegrationTest.java
@@ -65,31 +65,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeGroups;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-public class MultiClusterIntegrationTest extends ClusterTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MultiClusterIntegrationTest.class);
+/**
+ * Base class for multi-cluster integration tests. Contains common
setup/teardown logic,
+ * utility methods, and helper functions for managing multi-cluster test
environments.
+ */
+public abstract class BaseMultiClusterIntegrationTest extends ClusterTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BaseMultiClusterIntegrationTest.class);
protected static final String SCHEMA_FILE =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
protected static final String TIME_COLUMN = "DaysSinceEpoch";
- // TODO: N clusters instead of 2 in future iterations.
protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1";
protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
protected static final ClusterConfig CLUSTER_1_CONFIG = new
ClusterConfig(CLUSTER_1_NAME, 30000);
protected static final ClusterConfig CLUSTER_2_CONFIG = new
ClusterConfig(CLUSTER_2_NAME, 40000);
protected static final String DEFAULT_TENANT = "DefaultTenant";
- protected static final String LOGICAL_TABLE_NAME = "logical_table";
- protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
- protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE =
"logical_federation_table_cluster1";
- protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE =
"logical_federation_table_cluster2";
- protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE_2 =
"logical_federation_table2_cluster1";
- protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE_2 =
"logical_federation_table2_cluster2";
protected static final int TABLE_SIZE_CLUSTER_1 = 1500;
protected static final int TABLE_SIZE_CLUSTER_2 = 1000;
protected static final int SEGMENTS_PER_CLUSTER = 3;
@@ -105,7 +99,7 @@ public class MultiClusterIntegrationTest extends ClusterTest
{
@BeforeClass
public void setUp() throws Exception {
- LOGGER.info("Setting up MultiClusterIntegrationTest");
+ LOGGER.info("Setting up BaseMultiClusterIntegrationTest");
// Initialize cluster components
_cluster1 = new ClusterComponents();
@@ -123,14 +117,13 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
startControllerInit(_cluster2, CLUSTER_2_CONFIG);
// Start brokers and servers for both clusters
- // Note: Each cluster's broker is configured to know about the other
cluster as remote
startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG);
startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG);
// Start an alternate broker with one valid and one unavailable remote
cluster
startBrokerWithUnavailableCluster();
- LOGGER.info("MultiClusterIntegrationTest setup complete");
+ LOGGER.info("BaseMultiClusterIntegrationTest setup complete");
}
/**
@@ -161,125 +154,6 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
LOGGER.info("Started broker with unavailable cluster on port {}",
_brokerWithUnavailableCluster._brokerPort);
}
- // TODO: Add more tests for cross-cluster queries in subsequent iterations.
- @Test
- public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception {
- LOGGER.info("Testing that multi-cluster broker starts successfully and is
queryable");
-
- // Verify both clusters' brokers are running
(MultiClusterHelixBrokerStarter)
- assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be
started");
- assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be
started");
- assertTrue(_cluster1._brokerStarter instanceof
MultiClusterHelixBrokerStarter,
- "Cluster 1 broker should be MultiClusterHelixBrokerStarter");
- assertTrue(_cluster2._brokerStarter instanceof
MultiClusterHelixBrokerStarter,
- "Cluster 2 broker should be MultiClusterHelixBrokerStarter");
-
- // Setup a test table on both clusters
- String testTableName = "multicluster_test_table";
- createSchemaAndTableOnBothClusters(testTableName);
-
- // Create and load test data into both clusters
- _cluster1AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_1, 1);
- _cluster2AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_2, 2);
-
- loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
- loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
-
- // Verify cluster 1 is queryable
- String query = "SELECT COUNT(*) FROM " + testTableName;
- String result1 = executeQuery(query, _cluster1);
- assertNotNull(result1, "Query result from cluster 1 should not be null");
- long count1 = parseCountResult(result1);
- assertEquals(count1, TABLE_SIZE_CLUSTER_1);
-
- // Verify cluster 2 is queryable
- String result2 = executeQuery(query, _cluster2);
- assertNotNull(result2, "Query result from cluster 2 should not be null");
- long count2 = parseCountResult(result2);
- assertEquals(count2, TABLE_SIZE_CLUSTER_2);
-
- LOGGER.info("Multi-cluster broker test passed: both clusters started and
queryable");
- }
-
- @BeforeGroups("query")
- public void setupTablesForQueryTests() throws Exception {
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster1._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster2._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster1._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster2._controllerBaseApiUrl);
- setupFirstLogicalFederatedTable();
- setupSecondLogicalFederatedTable();
- createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE, LOGICAL_FEDERATION_CLUSTER_2_TABLE);
- createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE_2,
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
- cleanSegmentDirs();
- loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1),
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
- loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2),
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
- loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1,
1, SEGMENTS_PER_CLUSTER),
- LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
- loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2,
2, SEGMENTS_PER_CLUSTER),
- LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
- }
-
- @Test(dataProvider = "queryModes", groups = "query")
- public void testLogicalFederationQueries(String testName, String
queryOptions, boolean isJoinQuery,
- int brokerPort, boolean expectUnavailableException)
- throws Exception {
- LOGGER.info("Running {} on broker port {} (expectUnavailableException={})",
- testName, brokerPort, expectUnavailableException);
- long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
-
- if (isJoinQuery) {
- // Join query test
- String joinQuery = queryOptions
- + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME + " t1 "
- + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " =
t2." + JOIN_COLUMN + " "
- + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
- String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
- assertNotNull(result);
- assertTrue(result.contains("resultTable"), "Expected resultTable in
response: " + result);
- assertResultRows(result);
- verifyUnavailableClusterException(result, expectUnavailableException);
- }
-
- // Count query test (all modes)
- String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME;
- String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
- assertEquals(parseCountResult(countResult), expectedTotal);
- verifyUnavailableClusterException(countResult, expectUnavailableException);
- }
-
- /**
- * Data provider for all query mode combinations: broker mode x query
engine/options.
- * Each test case has: testName, queryOptions, isJoinQuery, brokerPort,
expectUnavailableException
- */
- @DataProvider(name = "queryModes")
- public Object[][] queryModes() {
- int normalBroker = _cluster1._brokerPort;
- int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
-
- String sseOpts = "SET enableMultiClusterRouting=true; ";
- String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
- String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
- String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
-
- return new Object[][]{
- // SSE tests (count only)
- {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
- {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
- // MSE tests (join + count)
- {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
- {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
- // Physical optimizer tests (join + count)
- {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker,
false},
- {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true,
unavailableBroker, true},
- // MSELiteMode tests (join + count)
- {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
- {"MSELiteMode-UnavailableBroker", mseLiteOpts, true,
unavailableBroker, true},
- };
- }
-
@Override
protected BaseBrokerStarter createBrokerStarter() {
return new MultiClusterHelixBrokerStarter();
@@ -310,6 +184,8 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
File _tarDir;
}
+ // ========== Cluster Setup Methods ==========
+
protected void setupDirectories() throws Exception {
setupClusterDirectories(_cluster1, "cluster1");
setupClusterDirectories(_cluster2, "cluster2");
@@ -406,6 +282,8 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
}
}
+ // ========== Data Generation Methods ==========
+
protected List<File> createAvroData(int dataSize, int clusterId) throws
Exception {
return createAvroDataMultipleSegments(dataSize, clusterId, 1);
}
@@ -473,6 +351,8 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
}
}
+ // ========== Data Loading Methods ==========
+
protected void loadDataIntoCluster(List<File> avroFiles, String tableName,
ClusterComponents cluster)
throws Exception {
cleanDirectories(cluster._segmentDir, cluster._tarDir);
@@ -517,6 +397,7 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
Thread.sleep(3000);
}
+ // ========== Schema and Table Management Methods ==========
protected void createSchemaAndTableForCluster(String tableName, String
controllerBaseApiUrl) throws IOException {
Schema schema = createSchema(SCHEMA_FILE);
@@ -563,6 +444,36 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
assertNotNull(response);
}
+ // ========== Logical Table Management Methods ==========
+
+ protected void createLogicalTable(String schemaFile,
+ Map<String, PhysicalTableConfig> physicalTableConfigMap, String
brokerTenant, String controllerBaseApiUrl,
+ String logicalTable, String refOfflineTable, String refRealtimeTable)
throws IOException {
+ ControllerRequestURLBuilder urlBuilder =
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
+ ControllerRequestClient client = new ControllerRequestClient(urlBuilder,
getHttpClient(),
+ getControllerRequestClientHeaders());
+ Schema schema = createSchema(schemaFile);
+ schema.setSchemaName(logicalTable);
+ client.addSchema(schema);
+ LogicalTableConfig config = new LogicalTableConfigBuilder()
+ .setTableName(logicalTable)
+ .setBrokerTenant(brokerTenant)
+ .setRefOfflineTableName(refOfflineTable)
+ .setRefRealtimeTableName(refRealtimeTable)
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .build();
+ String response =
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
+ config.toSingleLineJsonString(), Map.of());
+ assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" +
logicalTable
+ + " logical table successfully added.\"}");
+ }
+
+ protected void dropLogicalTableIfExists(String logicalTableName, String
controllerBaseApiUrl) {
+ dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
+ }
+
+ // ========== Query Execution Methods ==========
+
protected String executeQuery(String query, ClusterComponents cluster)
throws Exception {
return executeQueryOnBrokerPort(query, cluster._brokerPort);
}
@@ -573,6 +484,49 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
return ControllerTest.sendPostRequest(url,
JsonUtils.objectToPrettyString(payload));
}
+ protected long getCount(String tableName, ClusterComponents cluster, boolean
enableMultiClusterRouting)
+ throws Exception {
+ String query = "SET enableMultiClusterRouting=" +
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
+ + tableName;
+ return parseCountResult(executeQuery(query, cluster));
+ }
+
+ // ========== Result Parsing and Validation Methods ==========
+
+ /**
+ * Helper to verify physical table validation error when
enableMultiClusterRouting=true is used.
+ */
+ protected void assertPhysicalTableValidationError(JsonNode exceptions) {
+ assertNotNull(exceptions, "Expected validation error for physical table
with enableMultiClusterRouting=true");
+ assertTrue(exceptions.size() > 0, "Expected validation error exceptions");
+
+ boolean foundValidationError = false;
+ for (JsonNode ex : exceptions) {
+ String message = ex.get("message").asText();
+ int errorCode = ex.get("errorCode").asInt();
+ if (errorCode == 700 && message.contains("Physical table")
+ && message.contains("cannot be queried with
enableMultiClusterRouting=true")) {
+ foundValidationError = true;
+ break;
+ }
+ }
+ assertTrue(foundValidationError,
+ "Expected validation error stating physical tables cannot be queried
with enableMultiClusterRouting=true");
+ }
+
+ /**
+ * Helper to verify no federation-related exceptions are present.
+ */
+ protected void assertNoFederationExceptions(JsonNode exceptions) {
+ if (exceptions != null && exceptions.size() > 0) {
+ for (JsonNode ex : exceptions) {
+ String message = ex.get("message").asText();
+ assertTrue(!message.toLowerCase().contains("remote") &&
!message.toLowerCase().contains("federation"),
+ "Physical table queries should not have remote/federation
exceptions: " + message);
+ }
+ }
+ }
+
protected void verifyUnavailableClusterException(String result, boolean
expectException) throws Exception {
if (expectException) {
assertTrue(result.contains(UNAVAILABLE_CLUSTER_NAME),
@@ -607,12 +561,94 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
return 0;
}
+ protected void assertResultRows(String resultJson) throws Exception {
+ JsonNode rows =
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
+ assertNotNull(rows);
+ for (JsonNode row : rows) {
+ int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
+ // Depending on the number of records with the same join key in each
cluster, the expected count varies.
+ // If the number is less than the size of the smaller cluster, it should
appear in both clusters,
+ // resulting in 4 records (2 from each cluster).
+ // Otherwise, it should appear only in one cluster, resulting in 1
record.
+ int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1,
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
+ assertEquals(row.get(1).asInt(), expectedCount);
+ }
+ }
+
protected Schema createSchema(String schemaFileName) throws IOException {
InputStream schemaInputStream =
getClass().getClassLoader().getResourceAsStream(schemaFileName);
assertNotNull(schemaInputStream, "Schema file not found: " +
schemaFileName);
return Schema.fromInputStream(schemaInputStream);
}
+ // ========== Query Option Helpers ==========
+
+ /**
+ * Builds query option string based on the provided flags.
+ */
+ protected String buildQueryOptions(boolean enableMultiClusterRouting,
boolean useMultistageEngine,
+ boolean usePhysicalOptimizer, boolean runInBroker) {
+ StringBuilder opts = new StringBuilder();
+ if (enableMultiClusterRouting) {
+ opts.append("SET enableMultiClusterRouting=true; ");
+ }
+ if (useMultistageEngine) {
+ opts.append("SET useMultistageEngine=true; ");
+ }
+ if (usePhysicalOptimizer) {
+ opts.append("SET usePhysicalOptimizer=true; ");
+ }
+ if (runInBroker) {
+ opts.append("SET runInBroker=true; ");
+ }
+ return opts.toString();
+ }
+
+ /**
+ * Common test logic for verifying physical tables always query local
cluster only.
+ * Physical tables should be rejected when enableMultiClusterRouting=true,
and should
+ * return local-only data otherwise.
+ *
+ * @param physicalTableName The physical table name (with _OFFLINE suffix)
+ * @param queryOptions Query options string
+ * @param brokerPort Broker port to query
+ * @param expectValidationError Whether to expect a validation error
+ * @param testName Test name for logging
+ */
+ protected void verifyPhysicalTableLocalOnly(String physicalTableName, String
queryOptions, int brokerPort,
+ boolean expectValidationError, String testName) throws Exception {
+ LOGGER.info("Running {} on broker port {} - physical tables should always
query local cluster only",
+ testName, brokerPort);
+
+ String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " +
physicalTableName;
+ String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
+
+ JsonNode resultJson = JsonMapper.builder().build().readTree(countResult);
+ JsonNode exceptions = resultJson.get("exceptions");
+
+ if (expectValidationError) {
+ assertPhysicalTableValidationError(exceptions);
+ LOGGER.info("Verified {} correctly rejected physical table query with
enableMultiClusterRouting=true", testName);
+ } else {
+ long expectedLocalCount = TABLE_SIZE_CLUSTER_1;
+ long actualCount = parseCountResult(countResult);
+
+ assertEquals(actualCount, expectedLocalCount,
+ "Physical table should always return local cluster count");
+
+ assertNoFederationExceptions(exceptions);
+
+ LOGGER.info("Verified {} returned only local cluster data for physical
table (count={})",
+ testName, actualCount);
+ }
+ }
+
+ // ========== Cleanup Methods ==========
+
+ protected void cleanSegmentDirs() {
+ cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir,
_cluster2._segmentDir, _cluster2._tarDir);
+ }
+
@AfterClass
public void tearDown() throws Exception {
// Stop the alternate broker with unavailable cluster
@@ -649,93 +685,4 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
LOGGER.warn("Error stopping cluster", e);
}
}
-
- protected void cleanSegmentDirs() {
- cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir,
_cluster2._segmentDir, _cluster2._tarDir);
- }
-
- protected long getCount(String tableName, ClusterComponents cluster, boolean
enableMultiClusterRouting)
- throws Exception {
- String query = "SET enableMultiClusterRouting=" +
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
- + tableName;
- return parseCountResult(executeQuery(query, cluster));
- }
-
- /*
- Logical table helper methods
- */
- protected void createLogicalTable(String schemaFile,
- Map<String, PhysicalTableConfig> physicalTableConfigMap, String
brokerTenant, String controllerBaseApiUrl,
- String logicalTable, String refOfflineTable, String refRealtimeTable)
throws IOException {
- ControllerRequestURLBuilder urlBuilder =
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
- ControllerRequestClient client = new ControllerRequestClient(urlBuilder,
getHttpClient(),
- getControllerRequestClientHeaders());
- Schema schema = createSchema(schemaFile);
- schema.setSchemaName(logicalTable);
- client.addSchema(schema);
- LogicalTableConfig config = new LogicalTableConfigBuilder()
- .setTableName(logicalTable)
- .setBrokerTenant(brokerTenant)
- .setRefOfflineTableName(refOfflineTable)
- .setRefRealtimeTableName(refRealtimeTable)
- .setPhysicalTableConfigMap(physicalTableConfigMap)
- .build();
- String response =
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
- config.toSingleLineJsonString(), Map.of());
- assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" +
logicalTable
- + " logical table successfully added.\"}");
- }
-
- protected void createLogicalTableOnBothClusters(String logicalTableName,
- String cluster1PhysicalTable, String cluster2PhysicalTable) throws
IOException {
- // For cluster 1: cluster1's table is local (isMultiCluster=false),
cluster2's table is remote (isMultiCluster=true)
- Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap = Map.of(
- cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false),
- cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
- );
-
- // For cluster 2: cluster2's table is local (isMultiCluster=false),
cluster1's table is remote (isMultiCluster=true)
- Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap = Map.of(
- cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
- cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false)
- );
-
- createLogicalTable(SCHEMA_FILE, cluster1PhysicalTableConfigMap,
DEFAULT_TENANT,
- _cluster1._controllerBaseApiUrl, logicalTableName,
cluster1PhysicalTable + "_OFFLINE", null);
- createLogicalTable(SCHEMA_FILE, cluster2PhysicalTableConfigMap,
DEFAULT_TENANT,
- _cluster2._controllerBaseApiUrl, logicalTableName,
cluster2PhysicalTable + "_OFFLINE", null);
- }
-
- protected void dropLogicalTableIfExists(String logicalTableName, String
controllerBaseApiUrl) {
- dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
- }
-
- protected void setupFirstLogicalFederatedTable() throws Exception {
- setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE,
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
- }
-
- protected void setupSecondLogicalFederatedTable() throws Exception {
- setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE_2,
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
- }
-
- protected void setupLogicalFederatedTable(String cluster1TableName, String
cluster2TableName) throws Exception {
- dropTableAndSchemaIfExists(cluster1TableName,
_cluster1._controllerBaseApiUrl);
- dropTableAndSchemaIfExists(cluster2TableName,
_cluster2._controllerBaseApiUrl);
- createSchemaAndTableForCluster(cluster1TableName,
_cluster1._controllerBaseApiUrl);
- createSchemaAndTableForCluster(cluster2TableName,
_cluster2._controllerBaseApiUrl);
- }
-
- protected void assertResultRows(String resultJson) throws Exception {
- JsonNode rows =
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
- assertNotNull(rows);
- for (JsonNode row : rows) {
- int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
- // Depending on the number of records with the same join key in each
cluster, the expected count varies.
- // If the number is less than the size of the smaller cluster, it should
appear in both clusters,
- // resulting in 4 records (2 from each cluster).
- // Otherwise, it should appear only in one cluster, resulting in 1
record.
- int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1,
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
- assertEquals(row.get(1).asInt(), expectedCount);
- }
- }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index ad2cdbc72a3..452e3d1b4b6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -20,51 +20,11 @@ package org.apache.pinot.integration.tests.multicluster;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
-import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
-import org.apache.http.HttpStatus;
-import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
-import org.apache.pinot.broker.broker.helix.MultiClusterHelixBrokerStarter;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.BaseControllerStarter;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestClient;
-import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.ClusterTest;
-import org.apache.pinot.server.starter.helix.BaseServerStarter;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.CommonConstants.Broker;
-import org.apache.pinot.spi.utils.CommonConstants.Helix;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
-import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeGroups;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -73,95 +33,39 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-public class MultiClusterIntegrationTest extends ClusterTest {
+/**
+ * Integration tests for multi-cluster routing and federation.
+ * Tests federation with different table names across clusters and various
query modes.
+ */
+public class MultiClusterIntegrationTest extends
BaseMultiClusterIntegrationTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(MultiClusterIntegrationTest.class);
- protected static final String SCHEMA_FILE =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
- protected static final String TIME_COLUMN = "DaysSinceEpoch";
- // TODO: N clusters instead of 2 in future iterations.
- protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1";
- protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
- protected static final ClusterConfig CLUSTER_1_CONFIG = new
ClusterConfig(CLUSTER_1_NAME, 30000);
- protected static final ClusterConfig CLUSTER_2_CONFIG = new
ClusterConfig(CLUSTER_2_NAME, 40000);
- protected static final String DEFAULT_TENANT = "DefaultTenant";
- protected static final String LOGICAL_TABLE_NAME = "logical_table";
- protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
- protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE =
"logical_federation_table_cluster1";
- protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE =
"logical_federation_table_cluster2";
- protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE_2 =
"logical_federation_table2_cluster1";
- protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE_2 =
"logical_federation_table2_cluster2";
- protected static final int TABLE_SIZE_CLUSTER_1 = 1500;
- protected static final int TABLE_SIZE_CLUSTER_2 = 1000;
- protected static final int SEGMENTS_PER_CLUSTER = 3;
- protected static final String JOIN_COLUMN = "OriginCityName";
- protected static final String UNAVAILABLE_CLUSTER_NAME =
"UnavailableCluster";
- protected static final String UNAVAILABLE_ZK_ADDRESS = "localhost:29999";
-
- protected ClusterComponents _cluster1;
- protected ClusterComponents _cluster2;
- protected ClusterComponents _brokerWithUnavailableCluster;
- protected List<File> _cluster1AvroFiles;
- protected List<File> _cluster2AvroFiles;
-
- @BeforeClass
- public void setUp() throws Exception {
- LOGGER.info("Setting up MultiClusterIntegrationTest");
-
- // Initialize cluster components
- _cluster1 = new ClusterComponents();
- _cluster2 = new ClusterComponents();
-
- // Setup directories
- setupDirectories();
-
- // Start ZooKeeper instances for both clusters
- startZookeeper(_cluster1);
- startZookeeper(_cluster2);
-
- // Start controllers for both clusters
- startControllerInit(_cluster1, CLUSTER_1_CONFIG);
- startControllerInit(_cluster2, CLUSTER_2_CONFIG);
-
- // Start brokers and servers for both clusters
- // Note: Each cluster's broker is configured to know about the other
cluster as remote
- startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG);
- startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG);
-
- // Start an alternate broker with one valid and one unavailable remote
cluster
- startBrokerWithUnavailableCluster();
-
- LOGGER.info("MultiClusterIntegrationTest setup complete");
+ // Logical tables are the federated tables that span across clusters
+ protected String getLogicalTableName() {
+ return "federated_table1";
}
- /**
- * Starts a broker configured with cluster2 (valid) and an unavailable
cluster (invalid ZK).
- */
- private void startBrokerWithUnavailableCluster() throws Exception {
- _brokerWithUnavailableCluster = new ClusterComponents();
- _brokerWithUnavailableCluster._brokerPort = findAvailablePort(55000);
-
- PinotConfiguration brokerConfig = new PinotConfiguration();
- brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER,
_cluster1._zkUrl);
- brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, CLUSTER_1_NAME);
- brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME,
ControllerTest.LOCAL_HOST);
- brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT,
_brokerWithUnavailableCluster._brokerPort);
- brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
- brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
- brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
- brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES,
- CLUSTER_2_NAME + "," + UNAVAILABLE_CLUSTER_NAME);
-
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
CLUSTER_2_NAME),
- _cluster2._zkUrl);
-
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
UNAVAILABLE_CLUSTER_NAME),
- UNAVAILABLE_ZK_ADDRESS);
-
- _brokerWithUnavailableCluster._brokerStarter = createBrokerStarter();
- _brokerWithUnavailableCluster._brokerStarter.init(brokerConfig);
- _brokerWithUnavailableCluster._brokerStarter.start();
- LOGGER.info("Started broker with unavailable cluster on port {}",
_brokerWithUnavailableCluster._brokerPort);
+ protected String getLogicalTableName2() {
+ return "federated_table2";
+ }
+
+ // Physical tables are the actual tables in each cluster
+ protected String getPhysicalTable1InCluster1() {
+ return "physical_table1_c1";
+ }
+
+ protected String getPhysicalTable1InCluster2() {
+ return "physical_table1_c2";
+ }
+
+ protected String getPhysicalTable2InCluster1() {
+ return "physical_table2_c1";
+ }
+
+ protected String getPhysicalTable2InCluster2() {
+ return "physical_table2_c2";
}
- // TODO: Add more tests for cross-cluster queries in subsequent iterations.
@Test
public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception {
LOGGER.info("Testing that multi-cluster broker starts successfully and is
queryable");
@@ -169,10 +73,6 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
// Verify both clusters' brokers are running
(MultiClusterHelixBrokerStarter)
assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be
started");
assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be
started");
- assertTrue(_cluster1._brokerStarter instanceof
MultiClusterHelixBrokerStarter,
- "Cluster 1 broker should be MultiClusterHelixBrokerStarter");
- assertTrue(_cluster2._brokerStarter instanceof
MultiClusterHelixBrokerStarter,
- "Cluster 2 broker should be MultiClusterHelixBrokerStarter");
// Setup a test table on both clusters
String testTableName = "multicluster_test_table";
@@ -203,23 +103,22 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
@BeforeGroups("query")
public void setupTablesForQueryTests() throws Exception {
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster1._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME,
_cluster2._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster1._controllerBaseApiUrl);
- dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2,
_cluster2._controllerBaseApiUrl);
- setupFirstLogicalFederatedTable();
- setupSecondLogicalFederatedTable();
- createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE, LOGICAL_FEDERATION_CLUSTER_2_TABLE);
- createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
- LOGICAL_FEDERATION_CLUSTER_1_TABLE_2,
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
+ dropLogicalTableIfExists(getLogicalTableName(),
_cluster1._controllerBaseApiUrl);
+ dropLogicalTableIfExists(getLogicalTableName(),
_cluster2._controllerBaseApiUrl);
+ dropLogicalTableIfExists(getLogicalTableName2(),
_cluster1._controllerBaseApiUrl);
+ dropLogicalTableIfExists(getLogicalTableName2(),
_cluster2._controllerBaseApiUrl);
+ setupPhysicalTables();
+ createLogicalTableOnBothClusters(getLogicalTableName(),
+ getPhysicalTable1InCluster1(), getPhysicalTable1InCluster2());
+ createLogicalTableOnBothClusters(getLogicalTableName2(),
+ getPhysicalTable2InCluster1(), getPhysicalTable2InCluster2());
cleanSegmentDirs();
- loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1),
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
- loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2),
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
+ loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1),
getPhysicalTable1InCluster1(), _cluster1);
+ loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2),
getPhysicalTable1InCluster2(), _cluster2);
loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1,
1, SEGMENTS_PER_CLUSTER),
- LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
+ getPhysicalTable2InCluster1(), _cluster1);
loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2,
2, SEGMENTS_PER_CLUSTER),
- LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
+ getPhysicalTable2InCluster2(), _cluster2);
}
@Test(dataProvider = "queryModes", groups = "query")
@@ -233,8 +132,8 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
if (isJoinQuery) {
// Join query test
String joinQuery = queryOptions
- + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME + " t1 "
- + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " =
t2." + JOIN_COLUMN + " "
+ + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " +
getLogicalTableName() + " t1 "
+ + "JOIN " + getLogicalTableName2() + " t2 ON t1." + JOIN_COLUMN + "
= t2." + JOIN_COLUMN + " "
+ "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
assertNotNull(result);
@@ -244,498 +143,189 @@ public class MultiClusterIntegrationTest extends
ClusterTest {
}
// Count query test (all modes)
- String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " +
LOGICAL_TABLE_NAME;
+ String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " +
getLogicalTableName();
String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
assertEquals(parseCountResult(countResult), expectedTotal);
verifyUnavailableClusterException(countResult, expectUnavailableException);
}
- /**
- * Data provider for all query mode combinations: broker mode x query
engine/options.
- * Each test case has: testName, queryOptions, isJoinQuery, brokerPort,
expectUnavailableException
- */
- @DataProvider(name = "queryModes")
- public Object[][] queryModes() {
- int normalBroker = _cluster1._brokerPort;
- int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
-
- String sseOpts = "SET enableMultiClusterRouting=true; ";
- String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
- String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
- String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
-
- return new Object[][]{
- // SSE tests (count only)
- {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
- {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
- // MSE tests (join + count)
- {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
- {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
- // Physical optimizer tests (join + count)
- {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker,
false},
- {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true,
unavailableBroker, true},
- // MSELiteMode tests (join + count)
- {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
- {"MSELiteMode-UnavailableBroker", mseLiteOpts, true,
unavailableBroker, true},
- };
- }
-
- @Override
- protected BaseBrokerStarter createBrokerStarter() {
- return new MultiClusterHelixBrokerStarter();
- }
-
- protected static class ClusterConfig {
- final String _name;
- final int _basePort;
+ @Test(dataProvider = "queryModesWithoutMultiClusterOption", groups = "query")
+ public void testQueriesWithoutMultiClusterOptions(String testName, String
queryOptions, boolean isJoinQuery,
+ int brokerPort) throws Exception {
+ LOGGER.info("Running {} on broker port {} (expecting NO federation)",
testName, brokerPort);
- ClusterConfig(String name, int basePort) {
- _name = name;
- _basePort = basePort;
- }
- }
+ // When enableMultiClusterRouting is NOT set, only local cluster data
should be returned
+ long expectedLocalCount = TABLE_SIZE_CLUSTER_1;
- protected static class ClusterComponents {
- ZkStarter.ZookeeperInstance _zkInstance;
- BaseControllerStarter _controllerStarter;
- BaseBrokerStarter _brokerStarter;
- BaseServerStarter _serverStarter;
- int _controllerPort;
- int _brokerPort;
- int _serverPort;
- String _zkUrl;
- String _controllerBaseApiUrl;
- File _tempDir;
- File _segmentDir;
- File _tarDir;
- }
-
- protected void setupDirectories() throws Exception {
- setupClusterDirectories(_cluster1, "cluster1");
- setupClusterDirectories(_cluster2, "cluster2");
- }
-
- private void setupClusterDirectories(ClusterComponents cluster, String
clusterPrefix) throws Exception {
- cluster._tempDir = new File(FileUtils.getTempDirectory(), clusterPrefix +
"_" + getClass().getSimpleName());
- cluster._segmentDir = new File(cluster._tempDir, "segmentDir");
- cluster._tarDir = new File(cluster._tempDir, "tarDir");
- TestUtils.ensureDirectoriesExistAndEmpty(cluster._tempDir,
cluster._segmentDir, cluster._tarDir);
- }
-
- protected void startZookeeper(ClusterComponents cluster) throws Exception {
- cluster._zkInstance = ZkStarter.startLocalZkServer();
- cluster._zkUrl = cluster._zkInstance.getZkUrl();
- }
-
- protected void startControllerInit(ClusterComponents cluster, ClusterConfig
config) throws Exception {
- cluster._controllerPort = findAvailablePort(config._basePort);
- startController(cluster, config);
- }
-
- protected void startCluster(ClusterComponents cluster, ClusterComponents
remoteCluster,
- ClusterConfig config) throws Exception {
- cluster._brokerPort = findAvailablePort(cluster._controllerPort + 1000);
- startBroker(cluster, remoteCluster, config);
- cluster._serverPort = findAvailablePort(cluster._brokerPort + 1000);
- startServerWithMSE(cluster, config);
- }
-
- protected void startController(ClusterComponents cluster, ClusterConfig
config) throws Exception {
- Map<String, Object> controllerConfig = new HashMap<>();
- controllerConfig.put(ControllerConf.ZK_STR, cluster._zkUrl);
- controllerConfig.put(ControllerConf.HELIX_CLUSTER_NAME, config._name);
- controllerConfig.put(ControllerConf.CONTROLLER_HOST,
ControllerTest.LOCAL_HOST);
- controllerConfig.put(ControllerConf.CONTROLLER_PORT,
cluster._controllerPort);
- controllerConfig.put(ControllerConf.DATA_DIR,
cluster._tempDir.getAbsolutePath());
- controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR,
cluster._tempDir.getAbsolutePath());
- controllerConfig.put(ControllerConf.DISABLE_GROOVY, false);
- controllerConfig.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
- controllerConfig.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
-
- cluster._controllerStarter = createControllerStarter();
- cluster._controllerStarter.init(new PinotConfiguration(controllerConfig));
- cluster._controllerStarter.start();
- cluster._controllerBaseApiUrl = "http://localhost:" +
cluster._controllerPort;
- }
-
- protected void startBroker(ClusterComponents cluster, ClusterComponents
remoteCluster,
- ClusterConfig config) throws Exception {
- PinotConfiguration brokerConfig = new PinotConfiguration();
- brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
- String remoteClusterName = CLUSTER_1_NAME.equalsIgnoreCase(config._name) ?
CLUSTER_2_NAME : CLUSTER_1_NAME;
- brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES,
remoteClusterName);
-
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
remoteClusterName),
- remoteCluster._zkUrl);
- brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
- brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME,
ControllerTest.LOCAL_HOST);
- brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT,
cluster._brokerPort);
- brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
- brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
- brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
- cluster._brokerStarter = createBrokerStarter();
- cluster._brokerStarter.init(brokerConfig);
- cluster._brokerStarter.start();
- }
-
- protected void startServerWithMSE(ClusterComponents cluster, ClusterConfig
config) throws Exception {
- PinotConfiguration serverConfig = new PinotConfiguration();
- serverConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
- serverConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
- serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
ControllerTest.LOCAL_HOST);
- serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR,
cluster._tempDir + "/dataDir");
- serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
cluster._tempDir + "/segmentTar");
- serverConfig.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
- serverConfig.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
false);
- serverConfig.setProperty(Server.CONFIG_OF_ADMIN_API_PORT,
findAvailablePort(cluster._serverPort));
- serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_PORT,
findAvailablePort(cluster._serverPort + 1));
- serverConfig.setProperty(Server.CONFIG_OF_GRPC_PORT,
findAvailablePort(cluster._serverPort + 2));
-
serverConfig.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
true);
- serverConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
- serverConfig.setProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
-
- cluster._serverStarter = createServerStarter();
- cluster._serverStarter.init(serverConfig);
- cluster._serverStarter.start();
- }
-
- protected int findAvailablePort(int basePort) {
- try {
- return NetUtils.findOpenPort(basePort);
- } catch (Exception e) {
- throw new RuntimeException("Failed to find available port starting from
" + basePort, e);
- }
- }
-
- protected List<File> createAvroData(int dataSize, int clusterId) throws
Exception {
- return createAvroDataMultipleSegments(dataSize, clusterId, 1);
- }
-
- protected List<File> createAvroDataMultipleSegments(int totalDataSize, int
clusterId, int numSegments)
- throws Exception {
- Schema schema = createSchema(SCHEMA_FILE);
- org.apache.avro.Schema avroSchema = createAvroSchema(schema);
- File tempDir = (clusterId == 1) ? _cluster1._tempDir : _cluster2._tempDir;
- List<File> avroFiles = new ArrayList<>();
-
- for (int segment = 0; segment < numSegments; segment++) {
- File avroFile = new File(tempDir, "cluster" + clusterId +
"_data_segment" + segment + ".avro");
- try (DataFileWriter<GenericData.Record> writer = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- writer.create(avroSchema, avroFile);
- int start = segment * (totalDataSize / numSegments);
- int end = (segment == numSegments - 1) ? totalDataSize : (segment + 1)
* (totalDataSize / numSegments);
- for (int i = start; i < end; i++) {
- GenericData.Record record = new GenericData.Record(avroSchema);
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- record.put(fieldSpec.getName(),
generateFieldValue(fieldSpec.getName(), i, clusterId,
- fieldSpec.getDataType()));
- }
- writer.append(record);
- }
- }
- avroFiles.add(avroFile);
- }
- return avroFiles;
- }
-
- private org.apache.avro.Schema createAvroSchema(Schema schema) {
- org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
- List<org.apache.avro.Schema.Field> fields = new ArrayList<>();
-
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- org.apache.avro.Schema.Type avroType =
getAvroType(fieldSpec.getDataType());
- fields.add(new org.apache.avro.Schema.Field(fieldSpec.getName(),
- org.apache.avro.Schema.create(avroType), null, null));
- }
- avroSchema.setFields(fields);
- return avroSchema;
- }
-
- private org.apache.avro.Schema.Type getAvroType(FieldSpec.DataType type) {
- switch (type) {
- case INT: return org.apache.avro.Schema.Type.INT;
- case LONG: return org.apache.avro.Schema.Type.LONG;
- case FLOAT: return org.apache.avro.Schema.Type.FLOAT;
- case DOUBLE: return org.apache.avro.Schema.Type.DOUBLE;
- case BOOLEAN: return org.apache.avro.Schema.Type.BOOLEAN;
- default: return org.apache.avro.Schema.Type.STRING;
- }
- }
-
- private Object generateFieldValue(String fieldName, int index, int
clusterId, FieldSpec.DataType dataType) {
- int baseValue = index + (clusterId * 10000);
- switch (dataType) {
- case INT: return index + 10000;
- case LONG: return (long) baseValue;
- case FLOAT: return (float) (baseValue + 0.1);
- case DOUBLE: return (double) (baseValue + 0.1);
- case BOOLEAN: return (baseValue % 2) == 0;
- default: return "cluster_" + fieldName + "_" + index;
- }
- }
+ if (isJoinQuery) {
+ // Join query test - should only execute locally
+ String joinQuery = queryOptions
+ + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " +
getLogicalTableName() + " t1 "
+ + "JOIN " + getLogicalTableName2() + " t2 ON t1." + JOIN_COLUMN + "
= t2." + JOIN_COLUMN + " "
+ + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
+ String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
+ assertNotNull(result);
+ assertTrue(result.contains("resultTable"), "Expected resultTable in
response: " + result);
- protected void loadDataIntoCluster(List<File> avroFiles, String tableName,
ClusterComponents cluster)
- throws Exception {
- cleanDirectories(cluster._segmentDir, cluster._tarDir);
- Schema schema = createSchema(SCHEMA_FILE);
- schema.setSchemaName(tableName);
- TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
- .setTableName(tableName)
- .setTimeColumnName(TIME_COLUMN)
- .build();
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0,
- cluster._segmentDir, cluster._tarDir);
- uploadSegmentsToCluster(tableName, cluster._tarDir,
cluster._controllerBaseApiUrl);
- Thread.sleep(2000);
- }
+ // Verify no federation occurred by checking that only local cluster
data is present
+ JsonNode rows =
JsonMapper.builder().build().readTree(result).get("resultTable").get("rows");
+ assertNotNull(rows, "Result rows should exist");
- private void cleanDirectories(File... dirs) {
- for (File dir : dirs) {
- try {
- FileUtils.cleanDirectory(dir);
- } catch (IOException e) {
- // Ignore cleanup errors
+ for (JsonNode row : rows) {
+ int count = row.get(1).asInt();
+ assertTrue(count == 1,
+ "Expected local-only join count of 1, but got " + count + " for
row: " + row);
}
}
- }
- protected void uploadSegmentsToCluster(String tableName, File tarDir, String
controllerBaseApiUrl) throws Exception {
- File[] segmentTarFiles = tarDir.listFiles();
- assertNotNull(segmentTarFiles);
- assertTrue(segmentTarFiles.length > 0);
+ // Count query test - should only count local cluster data
+ String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " +
getLogicalTableName();
+ String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
+ long actualCount = parseCountResult(countResult);
- URI uploadSegmentHttpURI = URI.create(controllerBaseApiUrl + "/segments");
+ assertEquals(actualCount, expectedLocalCount,
+ "Expected local cluster count of " + expectedLocalCount + " but got "
+ actualCount);
- try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
- for (File segmentTarFile : segmentTarFiles) {
- int status =
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
- segmentTarFile.getName(), segmentTarFile, List.of(),
tableName, TableType.OFFLINE)
- .getStatusCode();
- assertEquals(status, HttpStatus.SC_OK);
+ // Verify no federation-related exceptions
+ JsonNode resultJson = JsonMapper.builder().build().readTree(countResult);
+ JsonNode exceptions = resultJson.get("exceptions");
+ if (exceptions != null && exceptions.size() > 0) {
+ for (JsonNode ex : exceptions) {
+ String message = ex.get("message").asText();
+ assertTrue(!message.contains("multicluster") &&
!message.contains("federation"),
+ "Unexpected multicluster/federation exception: " + message);
}
}
- Thread.sleep(3000);
- }
-
-
- protected void createSchemaAndTableForCluster(String tableName, String
controllerBaseApiUrl) throws IOException {
- Schema schema = createSchema(SCHEMA_FILE);
- schema.setSchemaName(tableName);
- addSchemaToCluster(schema, controllerBaseApiUrl);
- TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
- .setTableName(tableName)
- .setTimeColumnName(TIME_COLUMN)
- .build();
- addTableConfigToCluster(tableConfig, controllerBaseApiUrl);
- }
-
- protected void createSchemaAndTableOnBothClusters(String tableName) throws
Exception {
- dropTableAndSchemaIfExists(tableName, _cluster1._controllerBaseApiUrl);
- dropTableAndSchemaIfExists(tableName, _cluster2._controllerBaseApiUrl);
- createSchemaAndTableForCluster(tableName, _cluster1._controllerBaseApiUrl);
- createSchemaAndTableForCluster(tableName, _cluster2._controllerBaseApiUrl);
- }
-
- protected void dropTableAndSchemaIfExists(String tableName, String
controllerBaseApiUrl) {
- dropResource(controllerBaseApiUrl + "/tables/" + tableName);
- dropResource(controllerBaseApiUrl + "/schemas/" + tableName);
+ LOGGER.info("Verified {} returned only local cluster data (count={})",
testName, actualCount);
}
- private void dropResource(String url) {
- try {
- ControllerTest.sendDeleteRequest(url);
- } catch (Exception e) {
- // Ignore
- }
- }
-
- protected void addSchemaToCluster(Schema schema, String
controllerBaseApiUrl) throws IOException {
- String url = controllerBaseApiUrl + "/schemas";
- String schemaJson = schema.toPrettyJsonString();
- String response = ControllerTest.sendPostRequest(url, schemaJson);
- assertNotNull(response);
+ @Test(dataProvider = "physicalTableQueryModes", groups = "query")
+ public void testPhysicalTablesAlwaysQueryLocalCluster(String testName,
String queryOptions, int brokerPort,
+ boolean expectValidationError)
+ throws Exception {
+ String physicalTableName = getPhysicalTable1InCluster1() + "_OFFLINE";
+ verifyPhysicalTableLocalOnly(physicalTableName, queryOptions, brokerPort,
expectValidationError, testName);
}
- protected void addTableConfigToCluster(TableConfig tableConfig, String
controllerBaseApiUrl) throws IOException {
- String url = controllerBaseApiUrl + "/tables";
- String tableConfigJson = JsonUtils.objectToPrettyString(tableConfig);
- String response = ControllerTest.sendPostRequest(url, tableConfigJson);
- assertNotNull(response);
- }
+ @DataProvider(name = "queryModesWithoutMultiClusterOption")
+ public Object[][] queryModesWithoutMultiClusterOption() {
+ int normalBroker = _cluster1._brokerPort;
- protected String executeQuery(String query, ClusterComponents cluster)
throws Exception {
- return executeQueryOnBrokerPort(query, cluster._brokerPort);
- }
+ String noOpts = "";
+ String onlyMseOpts = "SET useMultistageEngine=true; ";
+ String onlyPhysOptOpts = "SET useMultistageEngine=true; SET
usePhysicalOptimizer=true; ";
+ String onlyMseLiteOpts = "SET useMultistageEngine=true; SET
usePhysicalOptimizer=true; SET runInBroker=true; ";
+ String explicitlyDisabledOpts = "SET enableMultiClusterRouting=false; ";
+ String explicitlyDisabledMseOpts = "SET enableMultiClusterRouting=false;
SET useMultistageEngine=true; ";
- protected String executeQueryOnBrokerPort(String query, int brokerPort)
throws Exception {
- Map<String, Object> payload = Map.of("sql", query);
- String url = "http://localhost:" + brokerPort + "/query/sql";
- return ControllerTest.sendPostRequest(url,
JsonUtils.objectToPrettyString(payload));
+ return new Object[][]{
+ {"SSE-NoMultiClusterOption", noOpts, false, normalBroker},
+ {"SSE-ExplicitlyDisabled", explicitlyDisabledOpts, false,
normalBroker},
+ {"MSE-NoMultiClusterOption", onlyMseOpts, true, normalBroker},
+ {"MSE-ExplicitlyDisabled", explicitlyDisabledMseOpts, true,
normalBroker},
+ {"PhysicalOptimizer-NoMultiClusterOption", onlyPhysOptOpts, true,
normalBroker},
+ {"MSELiteMode-NoMultiClusterOption", onlyMseLiteOpts, true,
normalBroker},
+ };
}
- protected void verifyUnavailableClusterException(String result, boolean
expectException) throws Exception {
- if (expectException) {
- assertTrue(result.contains(UNAVAILABLE_CLUSTER_NAME),
- "Response should mention unavailable cluster: " +
UNAVAILABLE_CLUSTER_NAME);
- JsonNode resultJson = JsonMapper.builder().build().readTree(result);
- JsonNode exceptions = resultJson.get("exceptions");
- assertNotNull(exceptions, "Exceptions array should exist");
- boolean found = false;
- for (JsonNode ex : exceptions) {
- if (ex.get("errorCode").asInt() == 510
- && ex.get("message").asText().contains(UNAVAILABLE_CLUSTER_NAME)) {
- found = true;
- break;
- }
- }
- assertTrue(found, "Should find REMOTE_CLUSTER_UNAVAILABLE (510)
exception");
- }
- }
+ @DataProvider(name = "physicalTableQueryModes")
+ public Object[][] physicalTableQueryModes() {
+ int normalBroker = _cluster1._brokerPort;
- protected long parseCountResult(String result) {
- try {
- JsonNode rows =
JsonMapper.builder().build().readTree(result).path("resultTable").path("rows");
- if (rows.isArray() && rows.size() > 0) {
- JsonNode firstRow = rows.get(0);
- if (firstRow.isArray() && firstRow.size() > 0) {
- return Long.parseLong(firstRow.get(0).asText());
- }
- }
- } catch (Exception e) {
- // Ignore
- }
- return 0;
- }
+ String withMultiClusterOpts = "SET enableMultiClusterRouting=true; ";
+ String withMultiClusterMseOpts = "SET enableMultiClusterRouting=true; SET
useMultistageEngine=true; ";
+ String noOpts = "";
+ String onlyMseOpts = "SET useMultistageEngine=true; ";
+ String explicitlyDisabledOpts = "SET enableMultiClusterRouting=false; ";
- protected Schema createSchema(String schemaFileName) throws IOException {
- InputStream schemaInputStream =
getClass().getClassLoader().getResourceAsStream(schemaFileName);
- assertNotNull(schemaInputStream, "Schema file not found: " +
schemaFileName);
- return Schema.fromInputStream(schemaInputStream);
+ return new Object[][]{
+ {"PhysicalTable-SSE-WithMultiClusterRouting", withMultiClusterOpts,
normalBroker, true},
+ {"PhysicalTable-MSE-WithMultiClusterRouting", withMultiClusterMseOpts,
normalBroker, true},
+ {"PhysicalTable-SSE-NoOptions", noOpts, normalBroker, false},
+ {"PhysicalTable-MSE-NoOptions", onlyMseOpts, normalBroker, false},
+ {"PhysicalTable-SSE-ExplicitlyDisabled", explicitlyDisabledOpts,
normalBroker, false},
+ };
}
- @AfterClass
- public void tearDown() throws Exception {
- // Stop the alternate broker with unavailable cluster
- if (_brokerWithUnavailableCluster != null &&
_brokerWithUnavailableCluster._brokerStarter != null) {
- try {
- _brokerWithUnavailableCluster._brokerStarter.stop();
- } catch (Exception e) {
- LOGGER.warn("Error stopping broker with unavailable cluster", e);
- }
- }
- stopCluster(_cluster1);
- stopCluster(_cluster2);
- }
+ @DataProvider(name = "queryModes")
+ public Object[][] queryModes() {
+ int normalBroker = _cluster1._brokerPort;
+ int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
- private void stopCluster(ClusterComponents cluster) {
- if (cluster == null) {
- return;
- }
- try {
- if (cluster._serverStarter != null) {
- cluster._serverStarter.stop();
- }
- if (cluster._brokerStarter != null) {
- cluster._brokerStarter.stop();
- }
- if (cluster._controllerStarter != null) {
- cluster._controllerStarter.stop();
- }
- if (cluster._zkInstance != null) {
- ZkStarter.stopLocalZkServer(cluster._zkInstance);
- }
- FileUtils.deleteQuietly(cluster._tempDir);
- } catch (Exception e) {
- LOGGER.warn("Error stopping cluster", e);
- }
- }
+ String sseOpts = "SET enableMultiClusterRouting=true; ";
+ String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
+ String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
+ String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
- protected void cleanSegmentDirs() {
- cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir,
_cluster2._segmentDir, _cluster2._tarDir);
+ return new Object[][]{
+ {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
+ {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
+ {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
+ {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
+ {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker,
false},
+ {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true,
unavailableBroker, true},
+ {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
+ {"MSELiteMode-UnavailableBroker", mseLiteOpts, true,
unavailableBroker, true},
+ };
}
- protected long getCount(String tableName, ClusterComponents cluster, boolean
enableMultiClusterRouting)
- throws Exception {
- String query = "SET enableMultiClusterRouting=" +
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
- + tableName;
- return parseCountResult(executeQuery(query, cluster));
- }
+ /**
+ * Setup physical tables based on configuration. By default, creates
different table names
+ * in each cluster. Subclasses can override to create same-named tables.
+ */
+ protected void setupPhysicalTables() throws Exception {
+ // Cluster 1 tables
+ dropTableAndSchemaIfExists(getPhysicalTable1InCluster1(),
_cluster1._controllerBaseApiUrl);
+ dropTableAndSchemaIfExists(getPhysicalTable2InCluster1(),
_cluster1._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(getPhysicalTable1InCluster1(),
_cluster1._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(getPhysicalTable2InCluster1(),
_cluster1._controllerBaseApiUrl);
- /*
- Logical table helper methods
- */
- protected void createLogicalTable(String schemaFile,
- Map<String, PhysicalTableConfig> physicalTableConfigMap, String
brokerTenant, String controllerBaseApiUrl,
- String logicalTable, String refOfflineTable, String refRealtimeTable)
throws IOException {
- ControllerRequestURLBuilder urlBuilder =
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
- ControllerRequestClient client = new ControllerRequestClient(urlBuilder,
getHttpClient(),
- getControllerRequestClientHeaders());
- Schema schema = createSchema(schemaFile);
- schema.setSchemaName(logicalTable);
- client.addSchema(schema);
- LogicalTableConfig config = new LogicalTableConfigBuilder()
- .setTableName(logicalTable)
- .setBrokerTenant(brokerTenant)
- .setRefOfflineTableName(refOfflineTable)
- .setRefRealtimeTableName(refRealtimeTable)
- .setPhysicalTableConfigMap(physicalTableConfigMap)
- .build();
- String response =
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
- config.toSingleLineJsonString(), Map.of());
- assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" +
logicalTable
- + " logical table successfully added.\"}");
+ // Cluster 2 tables
+ dropTableAndSchemaIfExists(getPhysicalTable1InCluster2(),
_cluster2._controllerBaseApiUrl);
+ dropTableAndSchemaIfExists(getPhysicalTable2InCluster2(),
_cluster2._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(getPhysicalTable1InCluster2(),
_cluster2._controllerBaseApiUrl);
+ createSchemaAndTableForCluster(getPhysicalTable2InCluster2(),
_cluster2._controllerBaseApiUrl);
}
protected void createLogicalTableOnBothClusters(String logicalTableName,
String cluster1PhysicalTable, String cluster2PhysicalTable) throws
IOException {
- // For cluster 1: cluster1's table is local (isMultiCluster=false),
cluster2's table is remote (isMultiCluster=true)
- Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap = Map.of(
- cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false),
- cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
- );
-
- // For cluster 2: cluster2's table is local (isMultiCluster=false),
cluster1's table is remote (isMultiCluster=true)
- Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap = Map.of(
- cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
- cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false)
- );
+ // Automatically detect if same-named tables: if so, both must be marked
as multi-cluster (true)
+ // If different names: local table is false, remote table is true
+ boolean areSameNamedTables =
cluster1PhysicalTable.equals(cluster2PhysicalTable);
+
+ Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap;
+ Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap;
+
+ if (areSameNamedTables) {
+ // Same table name: only one entry, marked as multi-cluster
+ cluster1PhysicalTableConfigMap = Map.of(
+ cluster1PhysicalTable + "_OFFLINE",
+ new PhysicalTableConfig(true) // Same-named table must be marked as
multi-cluster
+ );
+ cluster2PhysicalTableConfigMap = Map.of(
+ cluster1PhysicalTable + "_OFFLINE",
+ new PhysicalTableConfig(true) // Same-named table must be marked as
multi-cluster
+ );
+ } else {
+ // Different table names: local is false, remote is true
+ cluster1PhysicalTableConfigMap = Map.of(
+ cluster1PhysicalTable + "_OFFLINE",
+ new PhysicalTableConfig(false), // Local table
+ cluster2PhysicalTable + "_OFFLINE",
+ new PhysicalTableConfig(true) // Remote table
+ );
+ cluster2PhysicalTableConfigMap = Map.of(
+ cluster1PhysicalTable + "_OFFLINE",
+ new PhysicalTableConfig(true), // Remote table
+ cluster2PhysicalTable + "_OFFLINE",
+ new PhysicalTableConfig(false) // Local table
+ );
+ }
createLogicalTable(SCHEMA_FILE, cluster1PhysicalTableConfigMap,
DEFAULT_TENANT,
_cluster1._controllerBaseApiUrl, logicalTableName,
cluster1PhysicalTable + "_OFFLINE", null);
createLogicalTable(SCHEMA_FILE, cluster2PhysicalTableConfigMap,
DEFAULT_TENANT,
_cluster2._controllerBaseApiUrl, logicalTableName,
cluster2PhysicalTable + "_OFFLINE", null);
}
-
- protected void dropLogicalTableIfExists(String logicalTableName, String
controllerBaseApiUrl) {
- dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
- }
-
- protected void setupFirstLogicalFederatedTable() throws Exception {
- setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE,
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
- }
-
- protected void setupSecondLogicalFederatedTable() throws Exception {
- setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE_2,
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
- }
-
- protected void setupLogicalFederatedTable(String cluster1TableName, String
cluster2TableName) throws Exception {
- dropTableAndSchemaIfExists(cluster1TableName,
_cluster1._controllerBaseApiUrl);
- dropTableAndSchemaIfExists(cluster2TableName,
_cluster2._controllerBaseApiUrl);
- createSchemaAndTableForCluster(cluster1TableName,
_cluster1._controllerBaseApiUrl);
- createSchemaAndTableForCluster(cluster2TableName,
_cluster2._controllerBaseApiUrl);
- }
-
- protected void assertResultRows(String resultJson) throws Exception {
- JsonNode rows =
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
- assertNotNull(rows);
- for (JsonNode row : rows) {
- int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
- // Depending on the number of records with the same join key in each
cluster, the expected count varies.
- // If the number is less than the size of the smaller cluster, it should
appear in both clusters,
- // resulting in 4 records (2 from each cluster).
- // Otherwise, it should appear only in one cluster, resulting in 1
record.
- int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1,
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
- assertEquals(row.get(1).asInt(), expectedCount);
- }
- }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/SameTableNameMultiClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/SameTableNameMultiClusterIntegrationTest.java
new file mode 100644
index 00000000000..29a58f8980a
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/SameTableNameMultiClusterIntegrationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.multicluster;
+
+/**
+ * Integration tests for multi-cluster routing when the SAME physical table
name exists in both clusters.
+ * This class extends {@link MultiClusterIntegrationTest} and inherits all its
tests, but configures
+ * both clusters to use identical physical table names (unlike the parent
which uses different names).
+ */
+public class SameTableNameMultiClusterIntegrationTest extends
MultiClusterIntegrationTest {
+ /**
+ * Override to use cluster1's table name in cluster2 as well (same physical
table name).
+ */
+ @Override
+ protected String getPhysicalTable1InCluster2() {
+ return getPhysicalTable1InCluster1();
+ }
+
+ /**
+ * Override to use cluster1's table name in cluster2 as well (same physical
table name).
+ */
+ @Override
+ protected String getPhysicalTable2InCluster2() {
+ return getPhysicalTable2InCluster1();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]