This is an automated email from the ASF dual-hosted git repository.

shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b396056caf9 [federation] Disallow multi-cluster routing for physical 
tables (#17731)
b396056caf9 is described below

commit b396056caf9cdd5c2a32b8790a46641908d73247
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Feb 25 10:46:17 2026 -0800

    [federation] Disallow multi-cluster routing for physical tables (#17731)
    
    This PR enforces validation to prevent physical tables from being queried 
with enableMultiClusterRouting=true, since multi-cluster federation is 
supported only for logical tables. If a physical table is queried with 
multi-cluster routing enabled, the broker now returns a QUERY_VALIDATION error 
with a clear message.
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  29 +
 .../BaseSingleStageBrokerRequestHandler.java       |  13 +
 .../MultiStageBrokerRequestHandler.java            |   1 +
 ...t.java => BaseMultiClusterIntegrationTest.java} | 393 +++++------
 .../multicluster/MultiClusterIntegrationTest.java  | 776 +++++----------------
 .../SameTableNameMultiClusterIntegrationTest.java  |  42 ++
 6 files changed, 438 insertions(+), 816 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5b18088ccac..30d09fa2360 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -63,6 +63,7 @@ import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
 import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -261,6 +262,34 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     return tableAuthorizationResult;
   }
 
+  /**
+   * Validates that tables can be queried with enableMultiClusterRouting if 
and only if they are logical tables.
+   * Physical tables are cluster-specific and cannot be federated across 
clusters.
+   * Multi-cluster routing is only supported for logical tables.
+   *
+   * @param tableNames Set of table names to validate
+   * @param queryOptions Map of query options
+   * @throws QueryException if any physical table is queried with 
enableMultiClusterRouting=true
+   */
+  protected void validatePhysicalTablesWithMultiClusterRouting(Set<String> 
tableNames,
+      Map<String, String> queryOptions) {
+    Preconditions.checkNotNull(tableNames, "Table names cannot be null when 
validating multi-cluster routing");
+    Preconditions.checkNotNull(queryOptions, "Query options cannot be null");
+    boolean isMultiClusterRoutingEnabled = 
queryOptions.containsKey(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING)
+        && 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING));
+
+    if (isMultiClusterRoutingEnabled) {
+      for (String tableName : tableNames) {
+        if (!_tableCache.isLogicalTable(tableName)) {
+          throw QueryErrorCode.QUERY_VALIDATION.asException(
+              "Physical table '" + tableName + "' cannot be queried with 
enableMultiClusterRouting=true. "
+              + "Multi-cluster routing is only supported for logical tables. "
+              + "Please remove the enableMultiClusterRouting query option or 
use a logical table instead.");
+        }
+      }
+    }
+  }
+
   /**
    * Returns true if the QPS quota of query tables, database or application 
has been exceeded.
    */
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 62ddffa613b..9431ea311d3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -115,6 +115,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.exception.DatabaseConflictException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.query.QueryExecutionContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.QueryFingerprint;
@@ -430,6 +431,18 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     LogicalTableConfig logicalTableConfig = 
_tableCache.getLogicalTableConfig(rawTableName);
     String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
     long compilationEndTimeNs = System.nanoTime();
+
+    // Validate that physical tables are not queried with multi-cluster 
routing enabled.
+    // Unlike the MSE, the SSE has no centralized exception handler that 
converts QueryException into
+    // BrokerResponseNative, so we must catch and convert here to return a 
proper JSON error response.
+    try {
+      validatePhysicalTablesWithMultiClusterRouting(Set.of(tableName), 
sqlNodeAndOptions.getOptions());
+    } catch (QueryException e) {
+      LOGGER.warn("Request {}: {}", requestId, e.getMessage());
+      requestContext.setErrorCode(e.getErrorCode());
+      return new BrokerResponseNative(e.getErrorCode(), e.getMessage());
+    }
+
     // full request compile time = compilationTimeNs + parserTimeNs
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
         (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f589d90ee96..8d8afe0598a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -358,6 +358,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, 
mseWorkerInfo, _threadAccountant);
         QueryEnvironment.CompiledQuery compiledQuery = compileQuery(requestId, 
query, sqlNodeAndOptions, requestContext,
             httpHeaders, queryTimer)) {
+      
validatePhysicalTablesWithMultiClusterRouting(compiledQuery.getTableNames(), 
compiledQuery.getOptions());
       AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
       checkAuthorization(requesterIdentity, requestContext, httpHeaders, 
compiledQuery, rlsFiltersApplied);
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/BaseMultiClusterIntegrationTest.java
similarity index 73%
copy from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
copy to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/BaseMultiClusterIntegrationTest.java
index ad2cdbc72a3..ac24426a058 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/BaseMultiClusterIntegrationTest.java
@@ -65,31 +65,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeGroups;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
-public class MultiClusterIntegrationTest extends ClusterTest {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiClusterIntegrationTest.class);
+/**
+ * Base class for multi-cluster integration tests. Contains common 
setup/teardown logic,
+ * utility methods, and helper functions for managing multi-cluster test 
environments.
+ */
+public abstract class BaseMultiClusterIntegrationTest extends ClusterTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseMultiClusterIntegrationTest.class);
 
   protected static final String SCHEMA_FILE = 
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
   protected static final String TIME_COLUMN = "DaysSinceEpoch";
-  // TODO: N clusters instead of 2 in future iterations.
   protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1";
   protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
   protected static final ClusterConfig CLUSTER_1_CONFIG = new 
ClusterConfig(CLUSTER_1_NAME, 30000);
   protected static final ClusterConfig CLUSTER_2_CONFIG = new 
ClusterConfig(CLUSTER_2_NAME, 40000);
   protected static final String DEFAULT_TENANT = "DefaultTenant";
-  protected static final String LOGICAL_TABLE_NAME = "logical_table";
-  protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE = 
"logical_federation_table_cluster1";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE = 
"logical_federation_table_cluster2";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE_2 = 
"logical_federation_table2_cluster1";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE_2 = 
"logical_federation_table2_cluster2";
   protected static final int TABLE_SIZE_CLUSTER_1 = 1500;
   protected static final int TABLE_SIZE_CLUSTER_2 = 1000;
   protected static final int SEGMENTS_PER_CLUSTER = 3;
@@ -105,7 +99,7 @@ public class MultiClusterIntegrationTest extends ClusterTest 
{
 
   @BeforeClass
   public void setUp() throws Exception {
-    LOGGER.info("Setting up MultiClusterIntegrationTest");
+    LOGGER.info("Setting up BaseMultiClusterIntegrationTest");
 
     // Initialize cluster components
     _cluster1 = new ClusterComponents();
@@ -123,14 +117,13 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     startControllerInit(_cluster2, CLUSTER_2_CONFIG);
 
     // Start brokers and servers for both clusters
-    // Note: Each cluster's broker is configured to know about the other 
cluster as remote
     startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG);
     startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG);
 
     // Start an alternate broker with one valid and one unavailable remote 
cluster
     startBrokerWithUnavailableCluster();
 
-    LOGGER.info("MultiClusterIntegrationTest setup complete");
+    LOGGER.info("BaseMultiClusterIntegrationTest setup complete");
   }
 
   /**
@@ -161,125 +154,6 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     LOGGER.info("Started broker with unavailable cluster on port {}", 
_brokerWithUnavailableCluster._brokerPort);
   }
 
-  // TODO: Add more tests for cross-cluster queries in subsequent iterations.
-  @Test
-  public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception {
-    LOGGER.info("Testing that multi-cluster broker starts successfully and is 
queryable");
-
-    // Verify both clusters' brokers are running 
(MultiClusterHelixBrokerStarter)
-    assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be 
started");
-    assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be 
started");
-    assertTrue(_cluster1._brokerStarter instanceof 
MultiClusterHelixBrokerStarter,
-        "Cluster 1 broker should be MultiClusterHelixBrokerStarter");
-    assertTrue(_cluster2._brokerStarter instanceof 
MultiClusterHelixBrokerStarter,
-        "Cluster 2 broker should be MultiClusterHelixBrokerStarter");
-
-    // Setup a test table on both clusters
-    String testTableName = "multicluster_test_table";
-    createSchemaAndTableOnBothClusters(testTableName);
-
-    // Create and load test data into both clusters
-    _cluster1AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_1, 1);
-    _cluster2AvroFiles = createAvroData(TABLE_SIZE_CLUSTER_2, 2);
-
-    loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1);
-    loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2);
-
-    // Verify cluster 1 is queryable
-    String query = "SELECT COUNT(*) FROM " + testTableName;
-    String result1 = executeQuery(query, _cluster1);
-    assertNotNull(result1, "Query result from cluster 1 should not be null");
-    long count1 = parseCountResult(result1);
-    assertEquals(count1, TABLE_SIZE_CLUSTER_1);
-
-    // Verify cluster 2 is queryable
-    String result2 = executeQuery(query, _cluster2);
-    assertNotNull(result2, "Query result from cluster 2 should not be null");
-    long count2 = parseCountResult(result2);
-    assertEquals(count2, TABLE_SIZE_CLUSTER_2);
-
-    LOGGER.info("Multi-cluster broker test passed: both clusters started and 
queryable");
-  }
-
-  @BeforeGroups("query")
-  public void setupTablesForQueryTests() throws Exception {
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster1._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster2._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster1._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster2._controllerBaseApiUrl);
-    setupFirstLogicalFederatedTable();
-    setupSecondLogicalFederatedTable();
-    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
-      LOGICAL_FEDERATION_CLUSTER_1_TABLE, LOGICAL_FEDERATION_CLUSTER_2_TABLE);
-    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
-      LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
-    cleanSegmentDirs();
-    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1), 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
-    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2), 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
-    loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1, 
1, SEGMENTS_PER_CLUSTER),
-      LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
-    loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2, 
2, SEGMENTS_PER_CLUSTER),
-      LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
-  }
-
-  @Test(dataProvider = "queryModes", groups = "query")
-  public void testLogicalFederationQueries(String testName, String 
queryOptions, boolean isJoinQuery,
-      int brokerPort, boolean expectUnavailableException)
-      throws Exception {
-    LOGGER.info("Running {} on broker port {} (expectUnavailableException={})",
-        testName, brokerPort, expectUnavailableException);
-    long expectedTotal = TABLE_SIZE_CLUSTER_1 + TABLE_SIZE_CLUSTER_2;
-
-    if (isJoinQuery) {
-      // Join query test
-      String joinQuery = queryOptions
-          + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME + " t1 "
-          + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " = 
t2." + JOIN_COLUMN + " "
-          + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
-      String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
-      assertNotNull(result);
-      assertTrue(result.contains("resultTable"), "Expected resultTable in 
response: " + result);
-      assertResultRows(result);
-      verifyUnavailableClusterException(result, expectUnavailableException);
-    }
-
-    // Count query test (all modes)
-    String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME;
-    String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
-    assertEquals(parseCountResult(countResult), expectedTotal);
-    verifyUnavailableClusterException(countResult, expectUnavailableException);
-  }
-
-  /**
-   * Data provider for all query mode combinations: broker mode x query 
engine/options.
-   * Each test case has: testName, queryOptions, isJoinQuery, brokerPort, 
expectUnavailableException
-   */
-  @DataProvider(name = "queryModes")
-  public Object[][] queryModes() {
-    int normalBroker = _cluster1._brokerPort;
-    int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
-
-    String sseOpts = "SET enableMultiClusterRouting=true; ";
-    String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
-    String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
-    String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
-
-    return new Object[][]{
-        // SSE tests (count only)
-        {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
-        {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
-        // MSE tests (join + count)
-        {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
-        {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
-        // Physical optimizer tests (join + count)
-        {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker, 
false},
-        {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true, 
unavailableBroker, true},
-        // MSELiteMode tests (join + count)
-        {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
-        {"MSELiteMode-UnavailableBroker", mseLiteOpts, true, 
unavailableBroker, true},
-    };
-  }
-
   @Override
   protected BaseBrokerStarter createBrokerStarter() {
     return new MultiClusterHelixBrokerStarter();
@@ -310,6 +184,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     File _tarDir;
   }
 
+  // ========== Cluster Setup Methods ==========
+
   protected void setupDirectories() throws Exception {
     setupClusterDirectories(_cluster1, "cluster1");
     setupClusterDirectories(_cluster2, "cluster2");
@@ -406,6 +282,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     }
   }
 
+  // ========== Data Generation Methods ==========
+
   protected List<File> createAvroData(int dataSize, int clusterId) throws 
Exception {
     return createAvroDataMultipleSegments(dataSize, clusterId, 1);
   }
@@ -473,6 +351,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     }
   }
 
+  // ========== Data Loading Methods ==========
+
   protected void loadDataIntoCluster(List<File> avroFiles, String tableName, 
ClusterComponents cluster)
       throws Exception {
     cleanDirectories(cluster._segmentDir, cluster._tarDir);
@@ -517,6 +397,7 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     Thread.sleep(3000);
   }
 
+  // ========== Schema and Table Management Methods ==========
 
   protected void createSchemaAndTableForCluster(String tableName, String 
controllerBaseApiUrl) throws IOException {
     Schema schema = createSchema(SCHEMA_FILE);
@@ -563,6 +444,36 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     assertNotNull(response);
   }
 
+  // ========== Logical Table Management Methods ==========
+
+  protected void createLogicalTable(String schemaFile,
+      Map<String, PhysicalTableConfig> physicalTableConfigMap, String 
brokerTenant, String controllerBaseApiUrl,
+      String logicalTable, String refOfflineTable, String refRealtimeTable) 
throws IOException {
+    ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
+    ControllerRequestClient client = new ControllerRequestClient(urlBuilder, 
getHttpClient(),
+        getControllerRequestClientHeaders());
+    Schema schema = createSchema(schemaFile);
+    schema.setSchemaName(logicalTable);
+    client.addSchema(schema);
+    LogicalTableConfig config = new LogicalTableConfigBuilder()
+        .setTableName(logicalTable)
+        .setBrokerTenant(brokerTenant)
+        .setRefOfflineTableName(refOfflineTable)
+        .setRefRealtimeTableName(refRealtimeTable)
+        .setPhysicalTableConfigMap(physicalTableConfigMap)
+        .build();
+    String response = 
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
+        config.toSingleLineJsonString(), Map.of());
+    assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" + 
logicalTable
+        + " logical table successfully added.\"}");
+  }
+
+  protected void dropLogicalTableIfExists(String logicalTableName, String 
controllerBaseApiUrl) {
+    dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
+  }
+
+  // ========== Query Execution Methods ==========
+
   protected String executeQuery(String query, ClusterComponents cluster) 
throws Exception {
     return executeQueryOnBrokerPort(query, cluster._brokerPort);
   }
@@ -573,6 +484,49 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     return ControllerTest.sendPostRequest(url, 
JsonUtils.objectToPrettyString(payload));
   }
 
+  protected long getCount(String tableName, ClusterComponents cluster, boolean 
enableMultiClusterRouting)
+      throws Exception {
+    String query = "SET enableMultiClusterRouting=" + 
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
+      + tableName;
+    return parseCountResult(executeQuery(query, cluster));
+  }
+
+  // ========== Result Parsing and Validation Methods ==========
+
+  /**
+   * Helper to verify physical table validation error when 
enableMultiClusterRouting=true is used.
+   */
+  protected void assertPhysicalTableValidationError(JsonNode exceptions) {
+    assertNotNull(exceptions, "Expected validation error for physical table 
with enableMultiClusterRouting=true");
+    assertTrue(exceptions.size() > 0, "Expected validation error exceptions");
+
+    boolean foundValidationError = false;
+    for (JsonNode ex : exceptions) {
+      String message = ex.get("message").asText();
+      int errorCode = ex.get("errorCode").asInt();
+      if (errorCode == 700 && message.contains("Physical table")
+          && message.contains("cannot be queried with 
enableMultiClusterRouting=true")) {
+        foundValidationError = true;
+        break;
+      }
+    }
+    assertTrue(foundValidationError,
+        "Expected validation error stating physical tables cannot be queried 
with enableMultiClusterRouting=true");
+  }
+
+  /**
+   * Helper to verify no federation-related exceptions are present.
+   */
+  protected void assertNoFederationExceptions(JsonNode exceptions) {
+    if (exceptions != null && exceptions.size() > 0) {
+      for (JsonNode ex : exceptions) {
+        String message = ex.get("message").asText();
+        assertTrue(!message.toLowerCase().contains("remote") && 
!message.toLowerCase().contains("federation"),
+            "Physical table queries should not have remote/federation 
exceptions: " + message);
+      }
+    }
+  }
+
   protected void verifyUnavailableClusterException(String result, boolean 
expectException) throws Exception {
     if (expectException) {
       assertTrue(result.contains(UNAVAILABLE_CLUSTER_NAME),
@@ -607,12 +561,94 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     return 0;
   }
 
+  protected void assertResultRows(String resultJson) throws Exception {
+    JsonNode rows = 
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
+    assertNotNull(rows);
+    for (JsonNode row : rows) {
+      int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
+      // Depending on the number of records with the same join key in each 
cluster, the expected count varies.
+      // If the number is less than the size of the smaller cluster, it should 
appear in both clusters,
+      // resulting in 4 records (2 from each cluster).
+      // Otherwise, it should appear only in one cluster, resulting in 1 
record.
+      int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1, 
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
+      assertEquals(row.get(1).asInt(), expectedCount);
+    }
+  }
+
   protected Schema createSchema(String schemaFileName) throws IOException {
     InputStream schemaInputStream = 
getClass().getClassLoader().getResourceAsStream(schemaFileName);
     assertNotNull(schemaInputStream, "Schema file not found: " + 
schemaFileName);
     return Schema.fromInputStream(schemaInputStream);
   }
 
+  // ========== Query Option Helpers ==========
+
+  /**
+   * Builds query option string based on the provided flags.
+   */
+  protected String buildQueryOptions(boolean enableMultiClusterRouting, 
boolean useMultistageEngine,
+      boolean usePhysicalOptimizer, boolean runInBroker) {
+    StringBuilder opts = new StringBuilder();
+    if (enableMultiClusterRouting) {
+      opts.append("SET enableMultiClusterRouting=true; ");
+    }
+    if (useMultistageEngine) {
+      opts.append("SET useMultistageEngine=true; ");
+    }
+    if (usePhysicalOptimizer) {
+      opts.append("SET usePhysicalOptimizer=true; ");
+    }
+    if (runInBroker) {
+      opts.append("SET runInBroker=true; ");
+    }
+    return opts.toString();
+  }
+
+  /**
+   * Common test logic for verifying physical tables always query local 
cluster only.
+   * Physical tables should be rejected when enableMultiClusterRouting=true, 
and should
+   * return local-only data otherwise.
+   *
+   * @param physicalTableName The physical table name (with _OFFLINE suffix)
+   * @param queryOptions Query options string
+   * @param brokerPort Broker port to query
+   * @param expectValidationError Whether to expect a validation error
+   * @param testName Test name for logging
+   */
+  protected void verifyPhysicalTableLocalOnly(String physicalTableName, String 
queryOptions, int brokerPort,
+      boolean expectValidationError, String testName) throws Exception {
+    LOGGER.info("Running {} on broker port {} - physical tables should always 
query local cluster only",
+        testName, brokerPort);
+
+    String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " + 
physicalTableName;
+    String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
+
+    JsonNode resultJson = JsonMapper.builder().build().readTree(countResult);
+    JsonNode exceptions = resultJson.get("exceptions");
+
+    if (expectValidationError) {
+      assertPhysicalTableValidationError(exceptions);
+      LOGGER.info("Verified {} correctly rejected physical table query with 
enableMultiClusterRouting=true", testName);
+    } else {
+      long expectedLocalCount = TABLE_SIZE_CLUSTER_1;
+      long actualCount = parseCountResult(countResult);
+
+      assertEquals(actualCount, expectedLocalCount,
+          "Physical table should always return local cluster count");
+
+      assertNoFederationExceptions(exceptions);
+
+      LOGGER.info("Verified {} returned only local cluster data for physical 
table (count={})",
+          testName, actualCount);
+    }
+  }
+
+  // ========== Cleanup Methods ==========
+
+  protected void cleanSegmentDirs() {
+    cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir, 
_cluster2._segmentDir, _cluster2._tarDir);
+  }
+
   @AfterClass
   public void tearDown() throws Exception {
     // Stop the alternate broker with unavailable cluster
@@ -649,93 +685,4 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
       LOGGER.warn("Error stopping cluster", e);
     }
   }
-
-  protected void cleanSegmentDirs() {
-    cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir, 
_cluster2._segmentDir, _cluster2._tarDir);
-  }
-
-  protected long getCount(String tableName, ClusterComponents cluster, boolean 
enableMultiClusterRouting)
-      throws Exception {
-    String query = "SET enableMultiClusterRouting=" + 
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
-      + tableName;
-    return parseCountResult(executeQuery(query, cluster));
-  }
-
-  /*
-  Logical table helper methods
-  */
-  protected void createLogicalTable(String schemaFile,
-      Map<String, PhysicalTableConfig> physicalTableConfigMap, String 
brokerTenant, String controllerBaseApiUrl,
-      String logicalTable, String refOfflineTable, String refRealtimeTable) 
throws IOException {
-    ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
-    ControllerRequestClient client = new ControllerRequestClient(urlBuilder, 
getHttpClient(),
-        getControllerRequestClientHeaders());
-    Schema schema = createSchema(schemaFile);
-    schema.setSchemaName(logicalTable);
-    client.addSchema(schema);
-    LogicalTableConfig config = new LogicalTableConfigBuilder()
-        .setTableName(logicalTable)
-        .setBrokerTenant(brokerTenant)
-        .setRefOfflineTableName(refOfflineTable)
-        .setRefRealtimeTableName(refRealtimeTable)
-        .setPhysicalTableConfigMap(physicalTableConfigMap)
-        .build();
-    String response = 
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
-        config.toSingleLineJsonString(), Map.of());
-    assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" + 
logicalTable
-        + " logical table successfully added.\"}");
-  }
-
-  protected void createLogicalTableOnBothClusters(String logicalTableName,
-      String cluster1PhysicalTable, String cluster2PhysicalTable) throws 
IOException {
-    // For cluster 1: cluster1's table is local (isMultiCluster=false), 
cluster2's table is remote (isMultiCluster=true)
-    Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap = Map.of(
-        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false),
-        cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
-    );
-
-    // For cluster 2: cluster2's table is local (isMultiCluster=false), 
cluster1's table is remote (isMultiCluster=true)
-    Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap = Map.of(
-        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
-        cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false)
-    );
-
-    createLogicalTable(SCHEMA_FILE, cluster1PhysicalTableConfigMap, 
DEFAULT_TENANT,
-        _cluster1._controllerBaseApiUrl, logicalTableName, 
cluster1PhysicalTable + "_OFFLINE", null);
-    createLogicalTable(SCHEMA_FILE, cluster2PhysicalTableConfigMap, 
DEFAULT_TENANT,
-        _cluster2._controllerBaseApiUrl, logicalTableName, 
cluster2PhysicalTable + "_OFFLINE", null);
-  }
-
-  protected void dropLogicalTableIfExists(String logicalTableName, String 
controllerBaseApiUrl) {
-    dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
-  }
-
-  protected void setupFirstLogicalFederatedTable() throws Exception {
-    setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
-  }
-
-  protected void setupSecondLogicalFederatedTable() throws Exception {
-    setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
-  }
-
-  protected void setupLogicalFederatedTable(String cluster1TableName, String 
cluster2TableName) throws Exception {
-    dropTableAndSchemaIfExists(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
-    dropTableAndSchemaIfExists(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
-    createSchemaAndTableForCluster(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
-    createSchemaAndTableForCluster(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
-  }
-
-  protected void assertResultRows(String resultJson) throws Exception {
-    JsonNode rows = 
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
-    assertNotNull(rows);
-    for (JsonNode row : rows) {
-      int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
-      // Depending on the number of records with the same join key in each 
cluster, the expected count varies.
-      // If the number is less than the size of the smaller cluster, it should 
appear in both clusters,
-      // resulting in 4 records (2 from each cluster).
-      // Otherwise, it should appear only in one cluster, resulting in 1 
record.
-      int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1, 
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
-      assertEquals(row.get(1).asInt(), expectedCount);
-    }
-  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
index ad2cdbc72a3..452e3d1b4b6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java
@@ -20,51 +20,11 @@ package org.apache.pinot.integration.tests.multicluster;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
-import org.apache.http.HttpStatus;
-import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
-import org.apache.pinot.broker.broker.helix.MultiClusterHelixBrokerStarter;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.BaseControllerStarter;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestClient;
-import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.ClusterTest;
-import org.apache.pinot.server.starter.helix.BaseServerStarter;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.CommonConstants.Broker;
-import org.apache.pinot.spi.utils.CommonConstants.Helix;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
-import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeGroups;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -73,95 +33,39 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
-public class MultiClusterIntegrationTest extends ClusterTest {
+/**
+ * Integration tests for multi-cluster routing and federation.
+ * Tests federation with different table names across clusters and various 
query modes.
+ */
+public class MultiClusterIntegrationTest extends 
BaseMultiClusterIntegrationTest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiClusterIntegrationTest.class);
 
-  protected static final String SCHEMA_FILE = 
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
-  protected static final String TIME_COLUMN = "DaysSinceEpoch";
-  // TODO: N clusters instead of 2 in future iterations.
-  protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1";
-  protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2";
-  protected static final ClusterConfig CLUSTER_1_CONFIG = new 
ClusterConfig(CLUSTER_1_NAME, 30000);
-  protected static final ClusterConfig CLUSTER_2_CONFIG = new 
ClusterConfig(CLUSTER_2_NAME, 40000);
-  protected static final String DEFAULT_TENANT = "DefaultTenant";
-  protected static final String LOGICAL_TABLE_NAME = "logical_table";
-  protected static final String LOGICAL_TABLE_NAME_2 = "logical_table_2";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE = 
"logical_federation_table_cluster1";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE = 
"logical_federation_table_cluster2";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_1_TABLE_2 = 
"logical_federation_table2_cluster1";
-  protected static final String LOGICAL_FEDERATION_CLUSTER_2_TABLE_2 = 
"logical_federation_table2_cluster2";
-  protected static final int TABLE_SIZE_CLUSTER_1 = 1500;
-  protected static final int TABLE_SIZE_CLUSTER_2 = 1000;
-  protected static final int SEGMENTS_PER_CLUSTER = 3;
-  protected static final String JOIN_COLUMN = "OriginCityName";
-  protected static final String UNAVAILABLE_CLUSTER_NAME = 
"UnavailableCluster";
-  protected static final String UNAVAILABLE_ZK_ADDRESS = "localhost:29999";
-
-  protected ClusterComponents _cluster1;
-  protected ClusterComponents _cluster2;
-  protected ClusterComponents _brokerWithUnavailableCluster;
-  protected List<File> _cluster1AvroFiles;
-  protected List<File> _cluster2AvroFiles;
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    LOGGER.info("Setting up MultiClusterIntegrationTest");
-
-    // Initialize cluster components
-    _cluster1 = new ClusterComponents();
-    _cluster2 = new ClusterComponents();
-
-    // Setup directories
-    setupDirectories();
-
-    // Start ZooKeeper instances for both clusters
-    startZookeeper(_cluster1);
-    startZookeeper(_cluster2);
-
-    // Start controllers for both clusters
-    startControllerInit(_cluster1, CLUSTER_1_CONFIG);
-    startControllerInit(_cluster2, CLUSTER_2_CONFIG);
-
-    // Start brokers and servers for both clusters
-    // Note: Each cluster's broker is configured to know about the other 
cluster as remote
-    startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG);
-    startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG);
-
-    // Start an alternate broker with one valid and one unavailable remote 
cluster
-    startBrokerWithUnavailableCluster();
-
-    LOGGER.info("MultiClusterIntegrationTest setup complete");
+  // Logical tables are the federated tables that span across clusters
+  protected String getLogicalTableName() {
+    return "federated_table1";
   }
 
-  /**
-   * Starts a broker configured with cluster2 (valid) and an unavailable 
cluster (invalid ZK).
-   */
-  private void startBrokerWithUnavailableCluster() throws Exception {
-    _brokerWithUnavailableCluster = new ClusterComponents();
-    _brokerWithUnavailableCluster._brokerPort = findAvailablePort(55000);
-
-    PinotConfiguration brokerConfig = new PinotConfiguration();
-    brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, 
_cluster1._zkUrl);
-    brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, CLUSTER_1_NAME);
-    brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME, 
ControllerTest.LOCAL_HOST);
-    brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 
_brokerWithUnavailableCluster._brokerPort);
-    brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
-    brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
-    brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
-    brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES,
-        CLUSTER_2_NAME + "," + UNAVAILABLE_CLUSTER_NAME);
-    
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
 CLUSTER_2_NAME),
-        _cluster2._zkUrl);
-    
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
 UNAVAILABLE_CLUSTER_NAME),
-        UNAVAILABLE_ZK_ADDRESS);
-
-    _brokerWithUnavailableCluster._brokerStarter = createBrokerStarter();
-    _brokerWithUnavailableCluster._brokerStarter.init(brokerConfig);
-    _brokerWithUnavailableCluster._brokerStarter.start();
-    LOGGER.info("Started broker with unavailable cluster on port {}", 
_brokerWithUnavailableCluster._brokerPort);
+  protected String getLogicalTableName2() {
+    return "federated_table2";
+  }
+
+  // Physical tables are the actual tables in each cluster
+  protected String getPhysicalTable1InCluster1() {
+    return "physical_table1_c1";
+  }
+
+  protected String getPhysicalTable1InCluster2() {
+    return "physical_table1_c2";
+  }
+
+  protected String getPhysicalTable2InCluster1() {
+    return "physical_table2_c1";
+  }
+
+  protected String getPhysicalTable2InCluster2() {
+    return "physical_table2_c2";
   }
 
-  // TODO: Add more tests for cross-cluster queries in subsequent iterations.
   @Test
   public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception {
     LOGGER.info("Testing that multi-cluster broker starts successfully and is 
queryable");
@@ -169,10 +73,6 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     // Verify both clusters' brokers are running 
(MultiClusterHelixBrokerStarter)
     assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be 
started");
     assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be 
started");
-    assertTrue(_cluster1._brokerStarter instanceof 
MultiClusterHelixBrokerStarter,
-        "Cluster 1 broker should be MultiClusterHelixBrokerStarter");
-    assertTrue(_cluster2._brokerStarter instanceof 
MultiClusterHelixBrokerStarter,
-        "Cluster 2 broker should be MultiClusterHelixBrokerStarter");
 
     // Setup a test table on both clusters
     String testTableName = "multicluster_test_table";
@@ -203,23 +103,22 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
 
   @BeforeGroups("query")
   public void setupTablesForQueryTests() throws Exception {
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster1._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME, 
_cluster2._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster1._controllerBaseApiUrl);
-    dropLogicalTableIfExists(LOGICAL_TABLE_NAME_2, 
_cluster2._controllerBaseApiUrl);
-    setupFirstLogicalFederatedTable();
-    setupSecondLogicalFederatedTable();
-    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME,
-      LOGICAL_FEDERATION_CLUSTER_1_TABLE, LOGICAL_FEDERATION_CLUSTER_2_TABLE);
-    createLogicalTableOnBothClusters(LOGICAL_TABLE_NAME_2,
-      LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
+    dropLogicalTableIfExists(getLogicalTableName(), 
_cluster1._controllerBaseApiUrl);
+    dropLogicalTableIfExists(getLogicalTableName(), 
_cluster2._controllerBaseApiUrl);
+    dropLogicalTableIfExists(getLogicalTableName2(), 
_cluster1._controllerBaseApiUrl);
+    dropLogicalTableIfExists(getLogicalTableName2(), 
_cluster2._controllerBaseApiUrl);
+    setupPhysicalTables();
+    createLogicalTableOnBothClusters(getLogicalTableName(),
+        getPhysicalTable1InCluster1(), getPhysicalTable1InCluster2());
+    createLogicalTableOnBothClusters(getLogicalTableName2(),
+        getPhysicalTable2InCluster1(), getPhysicalTable2InCluster2());
     cleanSegmentDirs();
-    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1), 
LOGICAL_FEDERATION_CLUSTER_1_TABLE, _cluster1);
-    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2), 
LOGICAL_FEDERATION_CLUSTER_2_TABLE, _cluster2);
+    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_1, 1), 
getPhysicalTable1InCluster1(), _cluster1);
+    loadDataIntoCluster(createAvroData(TABLE_SIZE_CLUSTER_2, 2), 
getPhysicalTable1InCluster2(), _cluster2);
     loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_1, 
1, SEGMENTS_PER_CLUSTER),
-      LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, _cluster1);
+        getPhysicalTable2InCluster1(), _cluster1);
     loadDataIntoCluster(createAvroDataMultipleSegments(TABLE_SIZE_CLUSTER_2, 
2, SEGMENTS_PER_CLUSTER),
-      LOGICAL_FEDERATION_CLUSTER_2_TABLE_2, _cluster2);
+        getPhysicalTable2InCluster2(), _cluster2);
   }
 
   @Test(dataProvider = "queryModes", groups = "query")
@@ -233,8 +132,8 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     if (isJoinQuery) {
       // Join query test
       String joinQuery = queryOptions
-          + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME + " t1 "
-          + "JOIN " + LOGICAL_TABLE_NAME_2 + " t2 ON t1." + JOIN_COLUMN + " = 
t2." + JOIN_COLUMN + " "
+          + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
getLogicalTableName() + " t1 "
+          + "JOIN " + getLogicalTableName2() + " t2 ON t1." + JOIN_COLUMN + " 
= t2." + JOIN_COLUMN + " "
           + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
       String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
       assertNotNull(result);
@@ -244,498 +143,189 @@ public class MultiClusterIntegrationTest extends 
ClusterTest {
     }
 
     // Count query test (all modes)
-    String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " + 
LOGICAL_TABLE_NAME;
+    String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " + 
getLogicalTableName();
     String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
     assertEquals(parseCountResult(countResult), expectedTotal);
     verifyUnavailableClusterException(countResult, expectUnavailableException);
   }
 
-  /**
-   * Data provider for all query mode combinations: broker mode x query 
engine/options.
-   * Each test case has: testName, queryOptions, isJoinQuery, brokerPort, 
expectUnavailableException
-   */
-  @DataProvider(name = "queryModes")
-  public Object[][] queryModes() {
-    int normalBroker = _cluster1._brokerPort;
-    int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
-
-    String sseOpts = "SET enableMultiClusterRouting=true; ";
-    String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
-    String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
-    String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
-
-    return new Object[][]{
-        // SSE tests (count only)
-        {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
-        {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
-        // MSE tests (join + count)
-        {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
-        {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
-        // Physical optimizer tests (join + count)
-        {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker, 
false},
-        {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true, 
unavailableBroker, true},
-        // MSELiteMode tests (join + count)
-        {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
-        {"MSELiteMode-UnavailableBroker", mseLiteOpts, true, 
unavailableBroker, true},
-    };
-  }
-
-  @Override
-  protected BaseBrokerStarter createBrokerStarter() {
-    return new MultiClusterHelixBrokerStarter();
-  }
-
-  protected static class ClusterConfig {
-    final String _name;
-    final int _basePort;
+  @Test(dataProvider = "queryModesWithoutMultiClusterOption", groups = "query")
+  public void testQueriesWithoutMultiClusterOptions(String testName, String 
queryOptions, boolean isJoinQuery,
+      int brokerPort) throws Exception {
+    LOGGER.info("Running {} on broker port {} (expecting NO federation)", 
testName, brokerPort);
 
-    ClusterConfig(String name, int basePort) {
-      _name = name;
-      _basePort = basePort;
-    }
-  }
+    // When enableMultiClusterRouting is NOT set, only local cluster data 
should be returned
+    long expectedLocalCount = TABLE_SIZE_CLUSTER_1;
 
-  protected static class ClusterComponents {
-    ZkStarter.ZookeeperInstance _zkInstance;
-    BaseControllerStarter _controllerStarter;
-    BaseBrokerStarter _brokerStarter;
-    BaseServerStarter _serverStarter;
-    int _controllerPort;
-    int _brokerPort;
-    int _serverPort;
-    String _zkUrl;
-    String _controllerBaseApiUrl;
-    File _tempDir;
-    File _segmentDir;
-    File _tarDir;
-  }
-
-  protected void setupDirectories() throws Exception {
-    setupClusterDirectories(_cluster1, "cluster1");
-    setupClusterDirectories(_cluster2, "cluster2");
-  }
-
-  private void setupClusterDirectories(ClusterComponents cluster, String 
clusterPrefix) throws Exception {
-    cluster._tempDir = new File(FileUtils.getTempDirectory(), clusterPrefix + 
"_" + getClass().getSimpleName());
-    cluster._segmentDir = new File(cluster._tempDir, "segmentDir");
-    cluster._tarDir = new File(cluster._tempDir, "tarDir");
-    TestUtils.ensureDirectoriesExistAndEmpty(cluster._tempDir, 
cluster._segmentDir, cluster._tarDir);
-  }
-
-  protected void startZookeeper(ClusterComponents cluster) throws Exception {
-    cluster._zkInstance = ZkStarter.startLocalZkServer();
-    cluster._zkUrl = cluster._zkInstance.getZkUrl();
-  }
-
-  protected void startControllerInit(ClusterComponents cluster, ClusterConfig 
config) throws Exception {
-    cluster._controllerPort = findAvailablePort(config._basePort);
-    startController(cluster, config);
-  }
-
-  protected void startCluster(ClusterComponents cluster, ClusterComponents 
remoteCluster,
-      ClusterConfig config) throws Exception {
-    cluster._brokerPort = findAvailablePort(cluster._controllerPort + 1000);
-    startBroker(cluster, remoteCluster, config);
-    cluster._serverPort = findAvailablePort(cluster._brokerPort + 1000);
-    startServerWithMSE(cluster, config);
-  }
-
-  protected void startController(ClusterComponents cluster, ClusterConfig 
config) throws Exception {
-    Map<String, Object> controllerConfig = new HashMap<>();
-    controllerConfig.put(ControllerConf.ZK_STR, cluster._zkUrl);
-    controllerConfig.put(ControllerConf.HELIX_CLUSTER_NAME, config._name);
-    controllerConfig.put(ControllerConf.CONTROLLER_HOST, 
ControllerTest.LOCAL_HOST);
-    controllerConfig.put(ControllerConf.CONTROLLER_PORT, 
cluster._controllerPort);
-    controllerConfig.put(ControllerConf.DATA_DIR, 
cluster._tempDir.getAbsolutePath());
-    controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, 
cluster._tempDir.getAbsolutePath());
-    controllerConfig.put(ControllerConf.DISABLE_GROOVY, false);
-    controllerConfig.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
-    controllerConfig.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
-
-    cluster._controllerStarter = createControllerStarter();
-    cluster._controllerStarter.init(new PinotConfiguration(controllerConfig));
-    cluster._controllerStarter.start();
-    cluster._controllerBaseApiUrl = "http://localhost:"; + 
cluster._controllerPort;
-  }
-
-  protected void startBroker(ClusterComponents cluster, ClusterComponents 
remoteCluster,
-      ClusterConfig config) throws Exception {
-    PinotConfiguration brokerConfig = new PinotConfiguration();
-    brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
-    String remoteClusterName = CLUSTER_1_NAME.equalsIgnoreCase(config._name) ? 
CLUSTER_2_NAME : CLUSTER_1_NAME;
-    brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES, 
remoteClusterName);
-    
brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS,
 remoteClusterName),
-        remoteCluster._zkUrl);
-    brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
-    brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME, 
ControllerTest.LOCAL_HOST);
-    brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 
cluster._brokerPort);
-    brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
-    brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
-    brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
-    cluster._brokerStarter = createBrokerStarter();
-    cluster._brokerStarter.init(brokerConfig);
-    cluster._brokerStarter.start();
-  }
-
-  protected void startServerWithMSE(ClusterComponents cluster, ClusterConfig 
config) throws Exception {
-    PinotConfiguration serverConfig = new PinotConfiguration();
-    serverConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl);
-    serverConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name);
-    serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, 
ControllerTest.LOCAL_HOST);
-    serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, 
cluster._tempDir + "/dataDir");
-    serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, 
cluster._tempDir + "/segmentTar");
-    serverConfig.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
-    serverConfig.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, 
false);
-    serverConfig.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, 
findAvailablePort(cluster._serverPort));
-    serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_PORT, 
findAvailablePort(cluster._serverPort + 1));
-    serverConfig.setProperty(Server.CONFIG_OF_GRPC_PORT, 
findAvailablePort(cluster._serverPort + 2));
-    
serverConfig.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, 
true);
-    serverConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
-    serverConfig.setProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
-
-    cluster._serverStarter = createServerStarter();
-    cluster._serverStarter.init(serverConfig);
-    cluster._serverStarter.start();
-  }
-
-  protected int findAvailablePort(int basePort) {
-    try {
-      return NetUtils.findOpenPort(basePort);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to find available port starting from 
" + basePort, e);
-    }
-  }
-
-  protected List<File> createAvroData(int dataSize, int clusterId) throws 
Exception {
-    return createAvroDataMultipleSegments(dataSize, clusterId, 1);
-  }
-
-  protected List<File> createAvroDataMultipleSegments(int totalDataSize, int 
clusterId, int numSegments)
-      throws Exception {
-    Schema schema = createSchema(SCHEMA_FILE);
-    org.apache.avro.Schema avroSchema = createAvroSchema(schema);
-    File tempDir = (clusterId == 1) ? _cluster1._tempDir : _cluster2._tempDir;
-    List<File> avroFiles = new ArrayList<>();
-
-    for (int segment = 0; segment < numSegments; segment++) {
-      File avroFile = new File(tempDir, "cluster" + clusterId + 
"_data_segment" + segment + ".avro");
-      try (DataFileWriter<GenericData.Record> writer = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-        writer.create(avroSchema, avroFile);
-        int start = segment * (totalDataSize / numSegments);
-        int end = (segment == numSegments - 1) ? totalDataSize : (segment + 1) 
* (totalDataSize / numSegments);
-        for (int i = start; i < end; i++) {
-          GenericData.Record record = new GenericData.Record(avroSchema);
-          for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-            record.put(fieldSpec.getName(), 
generateFieldValue(fieldSpec.getName(), i, clusterId,
-                fieldSpec.getDataType()));
-          }
-          writer.append(record);
-        }
-      }
-      avroFiles.add(avroFile);
-    }
-    return avroFiles;
-  }
-
-  private org.apache.avro.Schema createAvroSchema(Schema schema) {
-    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
-    List<org.apache.avro.Schema.Field> fields = new ArrayList<>();
-
-    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-      org.apache.avro.Schema.Type avroType = 
getAvroType(fieldSpec.getDataType());
-      fields.add(new org.apache.avro.Schema.Field(fieldSpec.getName(),
-          org.apache.avro.Schema.create(avroType), null, null));
-    }
-    avroSchema.setFields(fields);
-    return avroSchema;
-  }
-
-  private org.apache.avro.Schema.Type getAvroType(FieldSpec.DataType type) {
-    switch (type) {
-      case INT: return org.apache.avro.Schema.Type.INT;
-      case LONG: return org.apache.avro.Schema.Type.LONG;
-      case FLOAT: return org.apache.avro.Schema.Type.FLOAT;
-      case DOUBLE: return org.apache.avro.Schema.Type.DOUBLE;
-      case BOOLEAN: return org.apache.avro.Schema.Type.BOOLEAN;
-      default: return org.apache.avro.Schema.Type.STRING;
-    }
-  }
-
-  private Object generateFieldValue(String fieldName, int index, int 
clusterId, FieldSpec.DataType dataType) {
-    int baseValue = index + (clusterId * 10000);
-    switch (dataType) {
-      case INT: return index + 10000;
-      case LONG: return (long) baseValue;
-      case FLOAT: return (float) (baseValue + 0.1);
-      case DOUBLE: return (double) (baseValue + 0.1);
-      case BOOLEAN: return (baseValue % 2) == 0;
-      default: return "cluster_" + fieldName + "_" + index;
-    }
-  }
+    if (isJoinQuery) {
+      // Join query test - should only execute locally
+      String joinQuery = queryOptions
+          + "SELECT t1." + JOIN_COLUMN + ", COUNT(*) as count FROM " + 
getLogicalTableName() + " t1 "
+          + "JOIN " + getLogicalTableName2() + " t2 ON t1." + JOIN_COLUMN + " 
= t2." + JOIN_COLUMN + " "
+          + "GROUP BY t1." + JOIN_COLUMN + " LIMIT 20";
+      String result = executeQueryOnBrokerPort(joinQuery, brokerPort);
+      assertNotNull(result);
+      assertTrue(result.contains("resultTable"), "Expected resultTable in 
response: " + result);
 
-  protected void loadDataIntoCluster(List<File> avroFiles, String tableName, 
ClusterComponents cluster)
-      throws Exception {
-    cleanDirectories(cluster._segmentDir, cluster._tarDir);
-    Schema schema = createSchema(SCHEMA_FILE);
-    schema.setSchemaName(tableName);
-    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
-        .setTableName(tableName)
-        .setTimeColumnName(TIME_COLUMN)
-        .build();
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0,
-        cluster._segmentDir, cluster._tarDir);
-    uploadSegmentsToCluster(tableName, cluster._tarDir, 
cluster._controllerBaseApiUrl);
-    Thread.sleep(2000);
-  }
+      // Verify no federation occurred by checking that only local cluster 
data is present
+      JsonNode rows = 
JsonMapper.builder().build().readTree(result).get("resultTable").get("rows");
+      assertNotNull(rows, "Result rows should exist");
 
-  private void cleanDirectories(File... dirs) {
-    for (File dir : dirs) {
-      try {
-        FileUtils.cleanDirectory(dir);
-      } catch (IOException e) {
-        // Ignore cleanup errors
+      for (JsonNode row : rows) {
+        int count = row.get(1).asInt();
+        assertTrue(count == 1,
+            "Expected local-only join count of 1, but got " + count + " for 
row: " + row);
       }
     }
-  }
 
-  protected void uploadSegmentsToCluster(String tableName, File tarDir, String 
controllerBaseApiUrl) throws Exception {
-    File[] segmentTarFiles = tarDir.listFiles();
-    assertNotNull(segmentTarFiles);
-    assertTrue(segmentTarFiles.length > 0);
+    // Count query test - should only count local cluster data
+    String countQuery = queryOptions + "SELECT COUNT(*) as count FROM " + 
getLogicalTableName();
+    String countResult = executeQueryOnBrokerPort(countQuery, brokerPort);
+    long actualCount = parseCountResult(countResult);
 
-    URI uploadSegmentHttpURI = URI.create(controllerBaseApiUrl + "/segments");
+    assertEquals(actualCount, expectedLocalCount,
+        "Expected local cluster count of " + expectedLocalCount + " but got " 
+ actualCount);
 
-    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
-      for (File segmentTarFile : segmentTarFiles) {
-        int status = 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
-                segmentTarFile.getName(), segmentTarFile, List.of(), 
tableName, TableType.OFFLINE)
-            .getStatusCode();
-        assertEquals(status, HttpStatus.SC_OK);
+    // Verify no federation-related exceptions
+    JsonNode resultJson = JsonMapper.builder().build().readTree(countResult);
+    JsonNode exceptions = resultJson.get("exceptions");
+    if (exceptions != null && exceptions.size() > 0) {
+      for (JsonNode ex : exceptions) {
+        String message = ex.get("message").asText();
+        assertTrue(!message.contains("multicluster") && 
!message.contains("federation"),
+            "Unexpected multicluster/federation exception: " + message);
       }
     }
 
-    Thread.sleep(3000);
-  }
-
-
-  protected void createSchemaAndTableForCluster(String tableName, String 
controllerBaseApiUrl) throws IOException {
-    Schema schema = createSchema(SCHEMA_FILE);
-    schema.setSchemaName(tableName);
-    addSchemaToCluster(schema, controllerBaseApiUrl);
-    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
-        .setTableName(tableName)
-        .setTimeColumnName(TIME_COLUMN)
-        .build();
-    addTableConfigToCluster(tableConfig, controllerBaseApiUrl);
-  }
-
-  protected void createSchemaAndTableOnBothClusters(String tableName) throws 
Exception {
-    dropTableAndSchemaIfExists(tableName, _cluster1._controllerBaseApiUrl);
-    dropTableAndSchemaIfExists(tableName, _cluster2._controllerBaseApiUrl);
-    createSchemaAndTableForCluster(tableName, _cluster1._controllerBaseApiUrl);
-    createSchemaAndTableForCluster(tableName, _cluster2._controllerBaseApiUrl);
-  }
-
-  protected void dropTableAndSchemaIfExists(String tableName, String 
controllerBaseApiUrl) {
-    dropResource(controllerBaseApiUrl + "/tables/" + tableName);
-    dropResource(controllerBaseApiUrl + "/schemas/" + tableName);
+    LOGGER.info("Verified {} returned only local cluster data (count={})", 
testName, actualCount);
   }
 
-  private void dropResource(String url) {
-    try {
-      ControllerTest.sendDeleteRequest(url);
-    } catch (Exception e) {
-      // Ignore
-    }
-  }
-
-  protected void addSchemaToCluster(Schema schema, String 
controllerBaseApiUrl) throws IOException {
-    String url = controllerBaseApiUrl + "/schemas";
-    String schemaJson = schema.toPrettyJsonString();
-    String response = ControllerTest.sendPostRequest(url, schemaJson);
-    assertNotNull(response);
+  @Test(dataProvider = "physicalTableQueryModes", groups = "query")
+  public void testPhysicalTablesAlwaysQueryLocalCluster(String testName, 
String queryOptions, int brokerPort,
+      boolean expectValidationError)
+      throws Exception {
+    String physicalTableName = getPhysicalTable1InCluster1() + "_OFFLINE";
+    verifyPhysicalTableLocalOnly(physicalTableName, queryOptions, brokerPort, 
expectValidationError, testName);
   }
 
-  protected void addTableConfigToCluster(TableConfig tableConfig, String 
controllerBaseApiUrl) throws IOException {
-    String url = controllerBaseApiUrl + "/tables";
-    String tableConfigJson = JsonUtils.objectToPrettyString(tableConfig);
-    String response = ControllerTest.sendPostRequest(url, tableConfigJson);
-    assertNotNull(response);
-  }
+  @DataProvider(name = "queryModesWithoutMultiClusterOption")
+  public Object[][] queryModesWithoutMultiClusterOption() {
+    int normalBroker = _cluster1._brokerPort;
 
-  protected String executeQuery(String query, ClusterComponents cluster) 
throws Exception {
-    return executeQueryOnBrokerPort(query, cluster._brokerPort);
-  }
+    String noOpts = "";
+    String onlyMseOpts = "SET useMultistageEngine=true; ";
+    String onlyPhysOptOpts = "SET useMultistageEngine=true; SET 
usePhysicalOptimizer=true; ";
+    String onlyMseLiteOpts = "SET useMultistageEngine=true; SET 
usePhysicalOptimizer=true; SET runInBroker=true; ";
+    String explicitlyDisabledOpts = "SET enableMultiClusterRouting=false; ";
+    String explicitlyDisabledMseOpts = "SET enableMultiClusterRouting=false; 
SET useMultistageEngine=true; ";
 
-  protected String executeQueryOnBrokerPort(String query, int brokerPort) 
throws Exception {
-    Map<String, Object> payload = Map.of("sql", query);
-    String url = "http://localhost:"; + brokerPort + "/query/sql";
-    return ControllerTest.sendPostRequest(url, 
JsonUtils.objectToPrettyString(payload));
+    return new Object[][]{
+        {"SSE-NoMultiClusterOption", noOpts, false, normalBroker},
+        {"SSE-ExplicitlyDisabled", explicitlyDisabledOpts, false, 
normalBroker},
+        {"MSE-NoMultiClusterOption", onlyMseOpts, true, normalBroker},
+        {"MSE-ExplicitlyDisabled", explicitlyDisabledMseOpts, true, 
normalBroker},
+        {"PhysicalOptimizer-NoMultiClusterOption", onlyPhysOptOpts, true, 
normalBroker},
+        {"MSELiteMode-NoMultiClusterOption", onlyMseLiteOpts, true, 
normalBroker},
+    };
   }
 
-  protected void verifyUnavailableClusterException(String result, boolean 
expectException) throws Exception {
-    if (expectException) {
-      assertTrue(result.contains(UNAVAILABLE_CLUSTER_NAME),
-          "Response should mention unavailable cluster: " + 
UNAVAILABLE_CLUSTER_NAME);
-      JsonNode resultJson = JsonMapper.builder().build().readTree(result);
-      JsonNode exceptions = resultJson.get("exceptions");
-      assertNotNull(exceptions, "Exceptions array should exist");
-      boolean found = false;
-      for (JsonNode ex : exceptions) {
-        if (ex.get("errorCode").asInt() == 510
-            && ex.get("message").asText().contains(UNAVAILABLE_CLUSTER_NAME)) {
-          found = true;
-          break;
-        }
-      }
-      assertTrue(found, "Should find REMOTE_CLUSTER_UNAVAILABLE (510) 
exception");
-    }
-  }
+  @DataProvider(name = "physicalTableQueryModes")
+  public Object[][] physicalTableQueryModes() {
+    int normalBroker = _cluster1._brokerPort;
 
-  protected long parseCountResult(String result) {
-    try {
-      JsonNode rows = 
JsonMapper.builder().build().readTree(result).path("resultTable").path("rows");
-      if (rows.isArray() && rows.size() > 0) {
-        JsonNode firstRow = rows.get(0);
-        if (firstRow.isArray() && firstRow.size() > 0) {
-          return Long.parseLong(firstRow.get(0).asText());
-        }
-      }
-    } catch (Exception e) {
-      // Ignore
-    }
-    return 0;
-  }
+    String withMultiClusterOpts = "SET enableMultiClusterRouting=true; ";
+    String withMultiClusterMseOpts = "SET enableMultiClusterRouting=true; SET 
useMultistageEngine=true; ";
+    String noOpts = "";
+    String onlyMseOpts = "SET useMultistageEngine=true; ";
+    String explicitlyDisabledOpts = "SET enableMultiClusterRouting=false; ";
 
-  protected Schema createSchema(String schemaFileName) throws IOException {
-    InputStream schemaInputStream = 
getClass().getClassLoader().getResourceAsStream(schemaFileName);
-    assertNotNull(schemaInputStream, "Schema file not found: " + 
schemaFileName);
-    return Schema.fromInputStream(schemaInputStream);
+    return new Object[][]{
+        {"PhysicalTable-SSE-WithMultiClusterRouting", withMultiClusterOpts, 
normalBroker, true},
+        {"PhysicalTable-MSE-WithMultiClusterRouting", withMultiClusterMseOpts, 
normalBroker, true},
+        {"PhysicalTable-SSE-NoOptions", noOpts, normalBroker, false},
+        {"PhysicalTable-MSE-NoOptions", onlyMseOpts, normalBroker, false},
+        {"PhysicalTable-SSE-ExplicitlyDisabled", explicitlyDisabledOpts, 
normalBroker, false},
+    };
   }
 
-  @AfterClass
-  public void tearDown() throws Exception {
-    // Stop the alternate broker with unavailable cluster
-    if (_brokerWithUnavailableCluster != null && 
_brokerWithUnavailableCluster._brokerStarter != null) {
-      try {
-        _brokerWithUnavailableCluster._brokerStarter.stop();
-      } catch (Exception e) {
-        LOGGER.warn("Error stopping broker with unavailable cluster", e);
-      }
-    }
-    stopCluster(_cluster1);
-    stopCluster(_cluster2);
-  }
+  @DataProvider(name = "queryModes")
+  public Object[][] queryModes() {
+    int normalBroker = _cluster1._brokerPort;
+    int unavailableBroker = _brokerWithUnavailableCluster._brokerPort;
 
-  private void stopCluster(ClusterComponents cluster) {
-    if (cluster == null) {
-      return;
-    }
-    try {
-      if (cluster._serverStarter != null) {
-        cluster._serverStarter.stop();
-      }
-      if (cluster._brokerStarter != null) {
-        cluster._brokerStarter.stop();
-      }
-      if (cluster._controllerStarter != null) {
-        cluster._controllerStarter.stop();
-      }
-      if (cluster._zkInstance != null) {
-        ZkStarter.stopLocalZkServer(cluster._zkInstance);
-      }
-      FileUtils.deleteQuietly(cluster._tempDir);
-    } catch (Exception e) {
-      LOGGER.warn("Error stopping cluster", e);
-    }
-  }
+    String sseOpts = "SET enableMultiClusterRouting=true; ";
+    String mseOpts = sseOpts + "SET useMultistageEngine=true; ";
+    String physOptOpts = mseOpts + "SET usePhysicalOptimizer=true; ";
+    String mseLiteOpts = physOptOpts + "SET runInBroker=true; ";
 
-  protected void cleanSegmentDirs() {
-    cleanDirectories(_cluster1._segmentDir, _cluster1._tarDir, 
_cluster2._segmentDir, _cluster2._tarDir);
+    return new Object[][]{
+        {"SSE-NormalBroker", sseOpts, false, normalBroker, false},
+        {"SSE-UnavailableBroker", sseOpts, false, unavailableBroker, true},
+        {"MSE-NormalBroker", mseOpts, true, normalBroker, false},
+        {"MSE-UnavailableBroker", mseOpts, true, unavailableBroker, true},
+        {"PhysicalOptimizer-NormalBroker", physOptOpts, true, normalBroker, 
false},
+        {"PhysicalOptimizer-UnavailableBroker", physOptOpts, true, 
unavailableBroker, true},
+        {"MSELiteMode-NormalBroker", mseLiteOpts, true, normalBroker, false},
+        {"MSELiteMode-UnavailableBroker", mseLiteOpts, true, 
unavailableBroker, true},
+    };
   }
 
-  protected long getCount(String tableName, ClusterComponents cluster, boolean 
enableMultiClusterRouting)
-      throws Exception {
-    String query = "SET enableMultiClusterRouting=" + 
enableMultiClusterRouting + "; SELECT COUNT(*) as count FROM "
-      + tableName;
-    return parseCountResult(executeQuery(query, cluster));
-  }
+  /**
+   * Setup physical tables based on configuration. By default, creates 
different table names
+   * in each cluster. Subclasses can override to create same-named tables.
+   */
+  protected void setupPhysicalTables() throws Exception {
+    // Cluster 1 tables
+    dropTableAndSchemaIfExists(getPhysicalTable1InCluster1(), 
_cluster1._controllerBaseApiUrl);
+    dropTableAndSchemaIfExists(getPhysicalTable2InCluster1(), 
_cluster1._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(getPhysicalTable1InCluster1(), 
_cluster1._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(getPhysicalTable2InCluster1(), 
_cluster1._controllerBaseApiUrl);
 
-  /*
-  Logical table helper methods
-  */
-  protected void createLogicalTable(String schemaFile,
-      Map<String, PhysicalTableConfig> physicalTableConfigMap, String 
brokerTenant, String controllerBaseApiUrl,
-      String logicalTable, String refOfflineTable, String refRealtimeTable) 
throws IOException {
-    ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(controllerBaseApiUrl);
-    ControllerRequestClient client = new ControllerRequestClient(urlBuilder, 
getHttpClient(),
-        getControllerRequestClientHeaders());
-    Schema schema = createSchema(schemaFile);
-    schema.setSchemaName(logicalTable);
-    client.addSchema(schema);
-    LogicalTableConfig config = new LogicalTableConfigBuilder()
-        .setTableName(logicalTable)
-        .setBrokerTenant(brokerTenant)
-        .setRefOfflineTableName(refOfflineTable)
-        .setRefRealtimeTableName(refRealtimeTable)
-        .setPhysicalTableConfigMap(physicalTableConfigMap)
-        .build();
-    String response = 
ControllerTest.sendPostRequest(urlBuilder.forLogicalTableCreate(),
-        config.toSingleLineJsonString(), Map.of());
-    assertEquals(response, "{\"unrecognizedProperties\":{},\"status\":\"" + 
logicalTable
-        + " logical table successfully added.\"}");
+    // Cluster 2 tables
+    dropTableAndSchemaIfExists(getPhysicalTable1InCluster2(), 
_cluster2._controllerBaseApiUrl);
+    dropTableAndSchemaIfExists(getPhysicalTable2InCluster2(), 
_cluster2._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(getPhysicalTable1InCluster2(), 
_cluster2._controllerBaseApiUrl);
+    createSchemaAndTableForCluster(getPhysicalTable2InCluster2(), 
_cluster2._controllerBaseApiUrl);
   }
 
   protected void createLogicalTableOnBothClusters(String logicalTableName,
       String cluster1PhysicalTable, String cluster2PhysicalTable) throws 
IOException {
-    // For cluster 1: cluster1's table is local (isMultiCluster=false), 
cluster2's table is remote (isMultiCluster=true)
-    Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap = Map.of(
-        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false),
-        cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true)
-    );
-
-    // For cluster 2: cluster2's table is local (isMultiCluster=false), 
cluster1's table is remote (isMultiCluster=true)
-    Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap = Map.of(
-        cluster1PhysicalTable + "_OFFLINE", new PhysicalTableConfig(true),
-        cluster2PhysicalTable + "_OFFLINE", new PhysicalTableConfig(false)
-    );
+    // Automatically detect if same-named tables: if so, both must be marked 
as multi-cluster (true)
+    // If different names: local table is false, remote table is true
+    boolean areSameNamedTables = 
cluster1PhysicalTable.equals(cluster2PhysicalTable);
+
+    Map<String, PhysicalTableConfig> cluster1PhysicalTableConfigMap;
+    Map<String, PhysicalTableConfig> cluster2PhysicalTableConfigMap;
+
+    if (areSameNamedTables) {
+      // Same table name: only one entry, marked as multi-cluster
+      cluster1PhysicalTableConfigMap = Map.of(
+          cluster1PhysicalTable + "_OFFLINE",
+          new PhysicalTableConfig(true)  // Same-named table must be marked as 
multi-cluster
+      );
+      cluster2PhysicalTableConfigMap = Map.of(
+          cluster1PhysicalTable + "_OFFLINE",
+          new PhysicalTableConfig(true)  // Same-named table must be marked as 
multi-cluster
+      );
+    } else {
+      // Different table names: local is false, remote is true
+      cluster1PhysicalTableConfigMap = Map.of(
+          cluster1PhysicalTable + "_OFFLINE",
+          new PhysicalTableConfig(false),  // Local table
+          cluster2PhysicalTable + "_OFFLINE",
+          new PhysicalTableConfig(true)  // Remote table
+      );
+      cluster2PhysicalTableConfigMap = Map.of(
+          cluster1PhysicalTable + "_OFFLINE",
+          new PhysicalTableConfig(true),  // Remote table
+          cluster2PhysicalTable + "_OFFLINE",
+          new PhysicalTableConfig(false)  // Local table
+      );
+    }
 
     createLogicalTable(SCHEMA_FILE, cluster1PhysicalTableConfigMap, 
DEFAULT_TENANT,
         _cluster1._controllerBaseApiUrl, logicalTableName, 
cluster1PhysicalTable + "_OFFLINE", null);
     createLogicalTable(SCHEMA_FILE, cluster2PhysicalTableConfigMap, 
DEFAULT_TENANT,
         _cluster2._controllerBaseApiUrl, logicalTableName, 
cluster2PhysicalTable + "_OFFLINE", null);
   }
-
-  protected void dropLogicalTableIfExists(String logicalTableName, String 
controllerBaseApiUrl) {
-    dropResource(controllerBaseApiUrl + "/logicalTables/" + logicalTableName);
-  }
-
-  protected void setupFirstLogicalFederatedTable() throws Exception {
-    setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE);
-  }
-
-  protected void setupSecondLogicalFederatedTable() throws Exception {
-    setupLogicalFederatedTable(LOGICAL_FEDERATION_CLUSTER_1_TABLE_2, 
LOGICAL_FEDERATION_CLUSTER_2_TABLE_2);
-  }
-
-  protected void setupLogicalFederatedTable(String cluster1TableName, String 
cluster2TableName) throws Exception {
-    dropTableAndSchemaIfExists(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
-    dropTableAndSchemaIfExists(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
-    createSchemaAndTableForCluster(cluster1TableName, 
_cluster1._controllerBaseApiUrl);
-    createSchemaAndTableForCluster(cluster2TableName, 
_cluster2._controllerBaseApiUrl);
-  }
-
-  protected void assertResultRows(String resultJson) throws Exception {
-    JsonNode rows = 
JsonMapper.builder().build().readTree(resultJson).get("resultTable").get("rows");
-    assertNotNull(rows);
-    for (JsonNode row : rows) {
-      int number = Integer.parseInt(row.get(0).asText().split("_")[2]);
-      // Depending on the number of records with the same join key in each 
cluster, the expected count varies.
-      // If the number is less than the size of the smaller cluster, it should 
appear in both clusters,
-      // resulting in 4 records (2 from each cluster).
-      // Otherwise, it should appear only in one cluster, resulting in 1 
record.
-      int expectedCount = number < Math.min(TABLE_SIZE_CLUSTER_1, 
TABLE_SIZE_CLUSTER_2) ? 4 : 1;
-      assertEquals(row.get(1).asInt(), expectedCount);
-    }
-  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/SameTableNameMultiClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/SameTableNameMultiClusterIntegrationTest.java
new file mode 100644
index 00000000000..29a58f8980a
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/SameTableNameMultiClusterIntegrationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.multicluster;
+
+/**
+ * Integration tests for multi-cluster routing when the SAME physical table 
name exists in both clusters.
+ * This class extends {@link MultiClusterIntegrationTest} and inherits all its 
tests, but configures
+ * both clusters to use identical physical table names (unlike the parent 
which uses different names).
+ */
+public class SameTableNameMultiClusterIntegrationTest extends 
MultiClusterIntegrationTest {
+  /**
+   * Override to use cluster1's table name in cluster2 as well (same physical 
table name).
+   */
+  @Override
+  protected String getPhysicalTable1InCluster2() {
+    return getPhysicalTable1InCluster1();
+  }
+
+  /**
+   * Override to use cluster1's table name in cluster2 as well (same physical 
table name).
+   */
+  @Override
+  protected String getPhysicalTable2InCluster2() {
+    return getPhysicalTable2InCluster1();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to