ankitsultana commented on code in PR #17296:
URL: https://github.com/apache/pinot/pull/17296#discussion_r2629189233


##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/MultiClusterRoutingManagerTest.java:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.SegmentsToQuery;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link MultiClusterRoutingManager}.
+ */
+public class MultiClusterRoutingManagerTest {
+  private static final String TEST_TABLE = "testTable_OFFLINE";
+  private static final long REQUEST_ID = 12345L;
+
+  @Mock
+  private BrokerRoutingManager _localClusterRoutingManager;
+
+  @Mock
+  private RemoteClusterBrokerRoutingManager _remoteClusterRoutingManager1;
+
+  @Mock
+  private RemoteClusterBrokerRoutingManager _remoteClusterRoutingManager2;
+
+  private MultiClusterRoutingManager _multiClusterRoutingManager;
+
+  @BeforeMethod
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+
+    List<RemoteClusterBrokerRoutingManager> remoteClusterManagers = 
Arrays.asList(
+        _remoteClusterRoutingManager1, _remoteClusterRoutingManager2);
+    _multiClusterRoutingManager = new MultiClusterRoutingManager(
+        _localClusterRoutingManager, remoteClusterManagers);
+  }
+
+  @Test
+  public void testRoutingExistsShortCircuits() {
+    
when(_localClusterRoutingManager.routingExists(TEST_TABLE)).thenReturn(true);
+
+    boolean exists = _multiClusterRoutingManager.routingExists(TEST_TABLE);
+
+    assertTrue(exists);
+    verify(_remoteClusterRoutingManager1, never()).routingExists(anyString());
+  }
+
+  @Test
+  public void testRoutingExistsChecksRemote() {
+    
when(_localClusterRoutingManager.routingExists(TEST_TABLE)).thenReturn(false);
+    
when(_remoteClusterRoutingManager1.routingExists(TEST_TABLE)).thenReturn(false);
+    
when(_remoteClusterRoutingManager2.routingExists(TEST_TABLE)).thenReturn(true);
+
+    boolean exists = _multiClusterRoutingManager.routingExists(TEST_TABLE);
+
+    assertTrue(exists);
+  }
+
+  @Test
+  public void testGetRoutingTableCombinesLocalAndRemote() {
+    BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+
+    RoutingTable localTable = createRoutingTable("localServer", 
Arrays.asList("seg1"));
+    RoutingTable remoteTable = createRoutingTable("remoteServer", 
Arrays.asList("seg2"));
+
+    when(_localClusterRoutingManager.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(localTable);
+    when(_remoteClusterRoutingManager1.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(remoteTable);
+    when(_remoteClusterRoutingManager2.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(null);
+
+    RoutingTable result = 
_multiClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, 
REQUEST_ID);
+
+    assertNotNull(result);
+    assertEquals(result.getServerInstanceToSegmentsMap().size(), 2);
+  }
+
+  @Test
+  public void testGetRoutingTableHandlesRemoteException() {
+    BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+    RoutingTable localTable = createRoutingTable("localServer", 
Arrays.asList("seg1"));
+
+    when(_localClusterRoutingManager.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(localTable);
+    when(_remoteClusterRoutingManager1.getRoutingTable(any(), anyString(), 
anyLong()))
+        .thenThrow(new RuntimeException("Remote error"));
+    when(_remoteClusterRoutingManager2.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(null);
+
+    RoutingTable result = 
_multiClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, 
REQUEST_ID);
+
+    assertNotNull(result);
+    assertEquals(result.getServerInstanceToSegmentsMap().size(), 1);
+  }
+
+  @Test
+  public void testGetRoutingTableReturnsNullWhenAllNull() {
+    BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+    when(_localClusterRoutingManager.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(null);
+    when(_remoteClusterRoutingManager1.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(null);
+    when(_remoteClusterRoutingManager2.getRoutingTable(brokerRequest, 
TEST_TABLE, REQUEST_ID))
+        .thenReturn(null);
+
+    RoutingTable result = 
_multiClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, 
REQUEST_ID);
+
+    assertNull(result);
+  }
+
+  @Test
+  public void testGetTimeBoundaryInfoFindsFirst() {
+    TimeBoundaryInfo timeBoundaryInfo = mock(TimeBoundaryInfo.class);
+    
when(_localClusterRoutingManager.getTimeBoundaryInfo(TEST_TABLE)).thenReturn(null);
+    
when(_remoteClusterRoutingManager1.getTimeBoundaryInfo(TEST_TABLE)).thenReturn(timeBoundaryInfo);
+
+    TimeBoundaryInfo result = 
_multiClusterRoutingManager.getTimeBoundaryInfo(TEST_TABLE);
+
+    assertNotNull(result);
+    assertEquals(result, timeBoundaryInfo);
+    verify(_remoteClusterRoutingManager2, 
never()).getTimeBoundaryInfo(anyString());
+  }
+
+  @Test
+  public void testGetServingInstancesCombinesAll() {
+    Set<String> localInstances = new HashSet<>(Arrays.asList("server1"));
+    Set<String> remoteInstances = new HashSet<>(Arrays.asList("server2"));
+
+    
when(_localClusterRoutingManager.getServingInstances(TEST_TABLE)).thenReturn(localInstances);
+    
when(_remoteClusterRoutingManager1.getServingInstances(TEST_TABLE)).thenReturn(remoteInstances);
+    
when(_remoteClusterRoutingManager2.getServingInstances(TEST_TABLE)).thenReturn(null);
+
+    Set<String> result = 
_multiClusterRoutingManager.getServingInstances(TEST_TABLE);
+
+    assertNotNull(result);
+    assertEquals(result.size(), 2);
+    assertTrue(result.contains("server1"));
+    assertTrue(result.contains("server2"));
+  }
+
+  @Test
+  public void testGetServingInstancesHandlesException() {
+    Set<String> localInstances = new HashSet<>(Arrays.asList("server1"));
+    
when(_localClusterRoutingManager.getServingInstances(TEST_TABLE)).thenReturn(localInstances);
+    when(_remoteClusterRoutingManager1.getServingInstances(TEST_TABLE))
+        .thenThrow(new RuntimeException("Error"));
+
+    Set<String> result = 
_multiClusterRoutingManager.getServingInstances(TEST_TABLE);
+
+    assertNotNull(result);
+    assertEquals(result.size(), 1);
+  }
+
+  @Test
+  public void testGetSegmentsHandlesNulls() {
+    BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+    List<String> remoteSegments = Arrays.asList("seg1");
+
+    
when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null);
+    
when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(remoteSegments);
+    
when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null);
+
+    List<String> result = 
_multiClusterRoutingManager.getSegments(brokerRequest);
+
+    assertNotNull(result);
+    assertEquals(result.size(), 1);
+    assertTrue(result.contains("seg1"));
+  }
+
+  @Test
+  public void testGetSegmentsReturnsNullWhenAllNull() {
+    BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+    
when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null);
+    
when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(null);
+    
when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null);
+
+    List<String> result = 
_multiClusterRoutingManager.getSegments(brokerRequest);
+
+    assertNull(result);
+  }
+
+  @Test
+  public void testGetEnabledServerInstanceMapCombinesAll() {

Review Comment:
   Interesting: instanceId is guaranteed to be unique only within a cluster.
   
   Do we need to think about this? What would happen if there's collision 
across clusters? Maybe we can add a scope prefix like 
`cluster-name::instance-id`.



##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/RemoteClusterBrokerRoutingManagerTest.java:
##########
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.controller.helix.ControllerTest;
+import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+/**
+ * Test class for {@link RemoteClusterBrokerRoutingManager}.
+ * Tests the remote cluster routing manager with real ZooKeeper to validate 
table discovery,
+ * routing updates, concurrent operations, and lifecycle management.
+ */
+public class RemoteClusterBrokerRoutingManagerTest extends ControllerTest {
+  private static final String REMOTE_CLUSTER_NAME = "remoteCluster";
+  private static final String RAW_TABLE_NAME = "remoteTable";
+  private static final String OFFLINE_TABLE = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String SERVER_INSTANCE_1 = "Server_remote_host1_9000";
+  private static final String SERVER_INSTANCE_2 = "Server_remote_host2_9000";
+
+  private BrokerMetrics _brokerMetrics;
+  private ServerRoutingStatsManager _serverRoutingStatsManager;
+  private PinotConfiguration _pinotConfig;
+  private RemoteClusterBrokerRoutingManager _routingManager;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    startZk();
+    startController();
+
+    _brokerMetrics = Mockito.mock(BrokerMetrics.class);
+    _serverRoutingStatsManager = Mockito.mock(ServerRoutingStatsManager.class);
+    _pinotConfig = Mockito.mock(PinotConfiguration.class);
+
+    
Mockito.when(_pinotConfig.getProperty(Mockito.eq("pinot.broker.adaptive.server.selector.type")))
+        .thenReturn("UNIFORM_RANDOM");
+    Mockito.when(_pinotConfig.getProperty(
+        
Mockito.eq(CommonConstants.Broker.CONFIG_OF_ROUTING_ASSIGNMENT_CHANGE_PROCESS_PARALLELISM),
 anyInt()))
+        .thenReturn(10);
+    Mockito.when(_pinotConfig.getProperty(anyString(), anyString()))
+        .thenAnswer(invocation -> invocation.getArgument(1));
+    Mockito.when(_pinotConfig.getProperty(anyString(), anyInt()))
+        .thenAnswer(invocation -> invocation.getArgument(1));
+
+    _routingManager = new RemoteClusterBrokerRoutingManager(
+        REMOTE_CLUSTER_NAME, _brokerMetrics, _serverRoutingStatsManager, 
_pinotConfig);
+    _routingManager.init(_helixManager);
+
+    addServerInstances();
+    triggerInstanceConfigProcessing();
+  }
+
+  private void triggerInstanceConfigProcessing() {
+    try {
+      
_routingManager.processClusterChange(org.apache.helix.HelixConstants.ChangeType.INSTANCE_CONFIG);
+    } catch (Exception e) {
+      Assert.fail("Failed to process instance config", e);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    if (_routingManager != null) {
+      _routingManager.shutdown();
+    }
+    stopController();
+    stopZk();
+  }
+
+  private void addServerInstances() {
+    String clusterName = getHelixClusterName();
+
+    for (String serverInstanceId : new String[]{SERVER_INSTANCE_1, 
SERVER_INSTANCE_2}) {
+      if 
(!_helixAdmin.getInstancesInCluster(clusterName).contains(serverInstanceId)) {
+        InstanceConfig instanceConfig = new InstanceConfig(serverInstanceId);
+        instanceConfig.setHostName(serverInstanceId.split("_")[1]);
+        instanceConfig.setPort("9000");
+        instanceConfig.setInstanceEnabled(true);
+        _helixAdmin.addInstance(clusterName, instanceConfig);
+        _helixAdmin.enableInstance(clusterName, serverInstanceId, true);
+      }
+    }
+  }
+
+  @Test
+  public void testTableDiscoveryAddsNewTable() throws Exception {
+    Assert.assertFalse(_routingManager.routingExists(OFFLINE_TABLE), "Table 
should not exist initially");
+
+    createTableInZooKeeper(OFFLINE_TABLE, TableType.OFFLINE);
+
+    _routingManager.processSegmentAssignmentChangeInternal();
+    Assert.assertTrue(_routingManager.hasRoutingChangeScheduled(), "Flag 
should be set after change");
+
+    _routingManager.determineRoutingChangeForTables();
+
+    Assert.assertTrue(_routingManager.routingExists(OFFLINE_TABLE), "Table 
should be discovered and added");
+    Assert.assertFalse(_routingManager.hasRoutingChangeScheduled(), "Flag 
should be consumed");
+  }
+
+  @Test
+  public void testTableRemovalDeletesRouting() throws Exception {
+    String tableToRemove = "tableToRemove_OFFLINE";
+    createTableInZooKeeper(tableToRemove, TableType.OFFLINE);
+
+    _routingManager.processSegmentAssignmentChangeInternal();
+    _routingManager.determineRoutingChangeForTables();
+    Assert.assertTrue(_routingManager.routingExists(tableToRemove), "Table 
should exist after discovery");
+
+    deleteTableFromZooKeeper(tableToRemove);
+
+    _routingManager.processSegmentAssignmentChangeInternal();
+    _routingManager.determineRoutingChangeForTables();
+
+    Assert.assertFalse(_routingManager.routingExists(tableToRemove), "Table 
should be removed from routing");
+  }
+
+  @Test
+  public void testConcurrentTableDiscoveryAndQueries() throws Exception {
+    String table1 = "concurrentTable1_OFFLINE";
+    String table2 = "concurrentTable2_OFFLINE";
+
+    createTableInZooKeeper(table1, TableType.OFFLINE);
+    _routingManager.processSegmentAssignmentChangeInternal();
+    _routingManager.determineRoutingChangeForTables();
+    Assert.assertTrue(_routingManager.routingExists(table1));
+
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(3);
+
+    AtomicReference<Exception> discoveryException = new AtomicReference<>();
+    AtomicReference<Exception> query1Exception = new AtomicReference<>();
+    AtomicReference<Exception> query2Exception = new AtomicReference<>();
+
+    try {
+      // Thread 1: Discover new table
+      Future<?> discoveryTask = executor.submit(() -> {
+        try {
+          startLatch.await();
+          createTableInZooKeeper(table2, TableType.OFFLINE);
+          _routingManager.processSegmentAssignmentChangeInternal();
+          _routingManager.determineRoutingChangeForTables();
+        } catch (Exception e) {
+          discoveryException.set(e);
+        } finally {
+          finishLatch.countDown();
+        }
+      });
+
+      // Thread 2: Query existing table
+      Future<?> query1Task = executor.submit(() -> {
+        try {
+          startLatch.await();
+          for (int i = 0; i < 10; i++) {
+            boolean exists = _routingManager.routingExists(table1);
+            Assert.assertTrue(exists, "Existing table should remain queryable 
during discovery");
+            Thread.sleep(5);
+          }
+        } catch (Exception e) {
+          query1Exception.set(e);
+        } finally {
+          finishLatch.countDown();
+        }
+      });
+
+      // Thread 3: Query both tables
+      Future<?> query2Task = executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(20); // Let discovery happen first
+          for (int i = 0; i < 5; i++) {
+            _routingManager.routingExists(table1);
+            _routingManager.routingExists(table2);
+            Thread.sleep(10);
+          }
+        } catch (Exception e) {
+          query2Exception.set(e);
+        } finally {
+          finishLatch.countDown();
+        }
+      });
+
+      startLatch.countDown();
+      Assert.assertTrue(finishLatch.await(10, TimeUnit.SECONDS), "Tasks should 
complete");
+
+      if (discoveryException.get() != null) {
+        Assert.fail("Discovery failed", discoveryException.get());
+      }
+      if (query1Exception.get() != null) {
+        Assert.fail("Query 1 failed", query1Exception.get());
+      }
+      if (query2Exception.get() != null) {
+        Assert.fail("Query 2 failed", query2Exception.get());
+      }
+
+      Assert.assertTrue(_routingManager.routingExists(table1), "Table 1 should 
exist");
+      Assert.assertTrue(_routingManager.routingExists(table2), "Table 2 should 
be discovered");
+    } finally {
+      executor.shutdown();
+      Assert.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+    }
+  }
+
+  @Test
+  public void testMultipleTableDiscoveryInParallel() throws Exception {
+    String[] tables = {
+        "parallelTable1_OFFLINE",
+        "parallelTable2_OFFLINE",
+        "parallelTable3_OFFLINE"
+    };
+
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(3);
+
+    AtomicReference<Exception> exception = new AtomicReference<>();
+
+    try {
+      for (int i = 0; i < tables.length; i++) {
+        final String tableName = tables[i];
+        executor.submit(() -> {
+          try {
+            startLatch.await();
+            createTableInZooKeeper(tableName, TableType.OFFLINE);
+          } catch (Exception e) {
+            exception.set(e);
+          } finally {
+            finishLatch.countDown();
+          }
+        });
+      }
+
+      startLatch.countDown();
+      Assert.assertTrue(finishLatch.await(10, TimeUnit.SECONDS), "Table 
creation should complete");
+
+      if (exception.get() != null) {
+        Assert.fail("Table creation failed", exception.get());
+      }
+
+      _routingManager.processSegmentAssignmentChangeInternal();
+      _routingManager.determineRoutingChangeForTables();
+
+      for (String tableName : tables) {
+        Assert.assertTrue(_routingManager.routingExists(tableName),
+            "Table " + tableName + " should be discovered");
+      }
+    } finally {
+      executor.shutdown();
+      Assert.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+    }
+  }
+
+  @Test
+  public void testRepeatedDiscoveryCalls() throws Exception {
+    String testTable = "repeatedTable_OFFLINE";
+    createTableInZooKeeper(testTable, TableType.OFFLINE);
+
+    for (int i = 0; i < 5; i++) {
+      _routingManager.processSegmentAssignmentChangeInternal();
+      _routingManager.determineRoutingChangeForTables();
+    }
+
+    Assert.assertTrue(_routingManager.routingExists(testTable),
+        "Table should be discovered after repeated calls");
+  }
+
+  @Test
+  public void testShutdownDuringDiscovery() throws Exception {
+    String testTable = "shutdownTable_OFFLINE";
+    createTableInZooKeeper(testTable, TableType.OFFLINE);
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(2);
+
+    AtomicReference<Exception> discoveryException = new AtomicReference<>();
+    AtomicReference<Exception> shutdownException = new AtomicReference<>();
+
+    try {
+      // Thread 1: Start discovery
+      Future<?> discoveryTask = executor.submit(() -> {
+        try {
+          startLatch.await();
+          _routingManager.processSegmentAssignmentChangeInternal();
+          _routingManager.determineRoutingChangeForTables();
+        } catch (Exception e) {
+          discoveryException.set(e);
+        } finally {
+          finishLatch.countDown();
+        }
+      });
+
+      // Thread 2: Shutdown
+      Future<?> shutdownTask = executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(10);

Review Comment:
   what's the role of sleep here? can we avoid it? (easy to run into flaky 
tests)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/MultiClusterRoutingManager.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.SegmentsToQuery;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code MultiClusterRoutingManager} implements the {@link 
RoutingManager} to support multi-cluster routing.
+ * It contains a local {@link BrokerRoutingManager} and multiple remote {@link 
RemoteClusterBrokerRoutingManager}
+ * instances. For each routing request, it first queries the local cluster 
routing manager, and then queries the remote
+ * cluster routing managers to combine the results.
+ * For example, when getting the routing table for a table, it first gets the 
routing table from the local cluster
+ * routing manager, and then gets the routing tables from the remote cluster 
routing managers to merge into a combined
+ * routing table.
+ */
+public class MultiClusterRoutingManager implements RoutingManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiClusterRoutingManager.class);
+
+  private final BrokerRoutingManager _localClusterRoutingManager;
+  private final List<RemoteClusterBrokerRoutingManager> 
_remoteClusterRoutingManagers;
+
+  public MultiClusterRoutingManager(BrokerRoutingManager 
localClusterRoutingManager,
+      List<RemoteClusterBrokerRoutingManager> remoteClusterRoutingManagers) {
+    _localClusterRoutingManager = localClusterRoutingManager;
+    _remoteClusterRoutingManagers = remoteClusterRoutingManagers;
+  }
+
+  @Nullable
+  private <T> T findFirst(Function<BrokerRoutingManager, T> getter, String 
tableNameForLog) {
+    T result = getter.apply(_localClusterRoutingManager);
+    if (result != null) {
+      return result;
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        result = getter.apply(remoteCluster);
+        if (result != null) {
+          return result;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error querying remote cluster routing manager for table 
{}", tableNameForLog, e);
+      }
+    }
+    return null;
+  }
+
+  private boolean anyMatch(Predicate<BrokerRoutingManager> predicate) {
+    if (predicate.test(_localClusterRoutingManager)) {
+      return true;
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      if (predicate.test(remoteCluster)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean routingExists(String tableNameWithType) {
+    return anyMatch(mgr -> mgr.routingExists(tableNameWithType));
+  }
+
+  @Override
+  public boolean isTableDisabled(String tableNameWithType) {
+    return anyMatch(mgr -> mgr.isTableDisabled(tableNameWithType));
+  }
+
+  @Nullable
+  @Override
+  public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
+    return getRoutingTable(brokerRequest, 
brokerRequest.getQuerySource().getTableName(), requestId);
+  }
+
+  @Nullable
+  @Override
+  public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId) {
+    RoutingTable localTable = 
_localClusterRoutingManager.getRoutingTable(brokerRequest, tableNameWithType, 
requestId);
+    return combineRoutingTables(localTable, tableNameWithType, brokerRequest, 
requestId);
+  }
+
+  private RoutingTable combineRoutingTables(@Nullable RoutingTable localTable, 
String tableNameWithType,
+      BrokerRequest brokerRequest, long requestId) {
+    Map<ServerInstance, SegmentsToQuery> combinedMap = localTable != null
+        ? new HashMap<>(localTable.getServerInstanceToSegmentsMap()) : new 
HashMap<>();
+    List<String> unavailableSegments = localTable != null
+        ? new ArrayList<>(localTable.getUnavailableSegments()) : new 
ArrayList<>();
+    int prunedCount = localTable != null ? localTable.getNumPrunedSegments() : 
0;
+
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        RoutingTable remoteTable = 
remoteCluster.getRoutingTable(brokerRequest, tableNameWithType, requestId);
+        if (remoteTable != null) {
+          mergeRoutingTable(combinedMap, remoteTable);
+          unavailableSegments.addAll(remoteTable.getUnavailableSegments());
+          prunedCount += remoteTable.getNumPrunedSegments();
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error combining routing table for table {}", 
tableNameWithType, e);
+      }
+    }
+    return combinedMap.isEmpty() && unavailableSegments.isEmpty() ? null
+        : new RoutingTable(combinedMap, unavailableSegments, prunedCount);
+  }
+
+  private void mergeRoutingTable(Map<ServerInstance, SegmentsToQuery> target, 
RoutingTable source) {
+    for (Map.Entry<ServerInstance, SegmentsToQuery> entry : 
source.getServerInstanceToSegmentsMap().entrySet()) {
+      SegmentsToQuery existing = target.get(entry.getKey());
+      if (existing != null) {
+        existing.getSegments().addAll(entry.getValue().getSegments());
+        
existing.getOptionalSegments().addAll(entry.getValue().getOptionalSegments());
+      } else {
+        target.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  @Nullable
+  @Override
+  public TimeBoundaryInfo getTimeBoundaryInfo(String tableNameWithType) {
+    return findFirst(mgr -> mgr.getTimeBoundaryInfo(tableNameWithType), 
tableNameWithType);
+  }
+
+  @Override
+  public Map<String, ServerInstance> getEnabledServerInstanceMap() {
+    Map<String, ServerInstance> combined = new 
HashMap<>(_localClusterRoutingManager.getEnabledServerInstanceMap());
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      combined.putAll(remoteCluster.getEnabledServerInstanceMap());
+    }
+    return combined;
+  }
+
+  @Override
+  public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
+    return findFirst(mgr -> mgr.getTablePartitionInfo(tableNameWithType), 
tableNameWithType);
+  }
+
+  @Override
+  public Set<String> getServingInstances(String tableNameWithType) {
+    Set<String> combined = new HashSet<>();
+    Set<String> localInstances = 
_localClusterRoutingManager.getServingInstances(tableNameWithType);
+    if (localInstances != null) {
+      combined.addAll(localInstances);
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        Set<String> instances = 
remoteCluster.getServingInstances(tableNameWithType);
+        if (instances != null) {
+          combined.addAll(instances);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error getting serving instances for table {}", 
tableNameWithType, e);
+      }
+    }
+    return combined.isEmpty() ? null : combined;
+  }
+
+  @Override
+  public List<String> getSegments(BrokerRequest brokerRequest) {
+    List<String> combined = new ArrayList<>();
+    List<String> localSegments = 
_localClusterRoutingManager.getSegments(brokerRequest);
+    if (localSegments != null) {
+      combined.addAll(localSegments);
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        List<String> remoteSegments = remoteCluster.getSegments(brokerRequest);
+        if (remoteSegments != null) {
+          combined.addAll(remoteSegments);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error getting segments from remote cluster routing 
manager", e);

Review Comment:
   note-to-self: same note as above



##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/RemoteClusterBrokerRoutingManagerTest.java:
##########
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.controller.helix.ControllerTest;
+import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+/**
+ * Test class for {@link RemoteClusterBrokerRoutingManager}.
+ * Tests the remote cluster routing manager with real ZooKeeper to validate 
table discovery,
+ * routing updates, concurrent operations, and lifecycle management.
+ */
+public class RemoteClusterBrokerRoutingManagerTest extends ControllerTest {
+  private static final String REMOTE_CLUSTER_NAME = "remoteCluster";
+  private static final String RAW_TABLE_NAME = "remoteTable";
+  private static final String OFFLINE_TABLE = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String SERVER_INSTANCE_1 = "Server_remote_host1_9000";
+  private static final String SERVER_INSTANCE_2 = "Server_remote_host2_9000";
+
+  private BrokerMetrics _brokerMetrics;
+  private ServerRoutingStatsManager _serverRoutingStatsManager;
+  private PinotConfiguration _pinotConfig;
+  private RemoteClusterBrokerRoutingManager _routingManager;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    startZk();
+    startController();
+
+    _brokerMetrics = Mockito.mock(BrokerMetrics.class);
+    _serverRoutingStatsManager = Mockito.mock(ServerRoutingStatsManager.class);
+    _pinotConfig = Mockito.mock(PinotConfiguration.class);
+
+    
Mockito.when(_pinotConfig.getProperty(Mockito.eq("pinot.broker.adaptive.server.selector.type")))
+        .thenReturn("UNIFORM_RANDOM");
+    Mockito.when(_pinotConfig.getProperty(
+        
Mockito.eq(CommonConstants.Broker.CONFIG_OF_ROUTING_ASSIGNMENT_CHANGE_PROCESS_PARALLELISM),
 anyInt()))
+        .thenReturn(10);
+    Mockito.when(_pinotConfig.getProperty(anyString(), anyString()))
+        .thenAnswer(invocation -> invocation.getArgument(1));
+    Mockito.when(_pinotConfig.getProperty(anyString(), anyInt()))
+        .thenAnswer(invocation -> invocation.getArgument(1));
+
+    _routingManager = new RemoteClusterBrokerRoutingManager(
+        REMOTE_CLUSTER_NAME, _brokerMetrics, _serverRoutingStatsManager, 
_pinotConfig);
+    _routingManager.init(_helixManager);
+
+    addServerInstances();
+    triggerInstanceConfigProcessing();
+  }
+
+  private void triggerInstanceConfigProcessing() {
+    try {
+      
_routingManager.processClusterChange(org.apache.helix.HelixConstants.ChangeType.INSTANCE_CONFIG);
+    } catch (Exception e) {
+      Assert.fail("Failed to process instance config", e);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    if (_routingManager != null) {
+      _routingManager.shutdown();
+    }
+    stopController();
+    stopZk();
+  }
+
+  private void addServerInstances() {
+    String clusterName = getHelixClusterName();
+
+    for (String serverInstanceId : new String[]{SERVER_INSTANCE_1, 
SERVER_INSTANCE_2}) {
+      if 
(!_helixAdmin.getInstancesInCluster(clusterName).contains(serverInstanceId)) {
+        InstanceConfig instanceConfig = new InstanceConfig(serverInstanceId);
+        instanceConfig.setHostName(serverInstanceId.split("_")[1]);
+        instanceConfig.setPort("9000");
+        instanceConfig.setInstanceEnabled(true);
+        _helixAdmin.addInstance(clusterName, instanceConfig);
+        _helixAdmin.enableInstance(clusterName, serverInstanceId, true);
+      }
+    }
+  }
+
+  @Test
+  public void testTableDiscoveryAddsNewTable() throws Exception {
+    Assert.assertFalse(_routingManager.routingExists(OFFLINE_TABLE), "Table 
should not exist initially");
+
+    createTableInZooKeeper(OFFLINE_TABLE, TableType.OFFLINE);
+
+    _routingManager.processSegmentAssignmentChangeInternal();
+    Assert.assertTrue(_routingManager.hasRoutingChangeScheduled(), "Flag 
should be set after change");
+
+    _routingManager.determineRoutingChangeForTables();
+
+    Assert.assertTrue(_routingManager.routingExists(OFFLINE_TABLE), "Table 
should be discovered and added");
+    Assert.assertFalse(_routingManager.hasRoutingChangeScheduled(), "Flag 
should be consumed");
+  }
+
+  @Test
+  public void testTableRemovalDeletesRouting() throws Exception {
+    String tableToRemove = "tableToRemove_OFFLINE";
+    createTableInZooKeeper(tableToRemove, TableType.OFFLINE);
+
+    _routingManager.processSegmentAssignmentChangeInternal();
+    _routingManager.determineRoutingChangeForTables();
+    Assert.assertTrue(_routingManager.routingExists(tableToRemove), "Table 
should exist after discovery");
+
+    deleteTableFromZooKeeper(tableToRemove);
+
+    _routingManager.processSegmentAssignmentChangeInternal();
+    _routingManager.determineRoutingChangeForTables();
+
+    Assert.assertFalse(_routingManager.routingExists(tableToRemove), "Table 
should be removed from routing");
+  }
+
+  @Test
+  public void testConcurrentTableDiscoveryAndQueries() throws Exception {
+    String table1 = "concurrentTable1_OFFLINE";
+    String table2 = "concurrentTable2_OFFLINE";
+
+    createTableInZooKeeper(table1, TableType.OFFLINE);
+    _routingManager.processSegmentAssignmentChangeInternal();
+    _routingManager.determineRoutingChangeForTables();
+    Assert.assertTrue(_routingManager.routingExists(table1));
+
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(3);
+
+    AtomicReference<Exception> discoveryException = new AtomicReference<>();
+    AtomicReference<Exception> query1Exception = new AtomicReference<>();
+    AtomicReference<Exception> query2Exception = new AtomicReference<>();
+
+    try {
+      // Thread 1: Discover new table
+      Future<?> discoveryTask = executor.submit(() -> {
+        try {
+          startLatch.await();
+          createTableInZooKeeper(table2, TableType.OFFLINE);
+          _routingManager.processSegmentAssignmentChangeInternal();
+          _routingManager.determineRoutingChangeForTables();
+        } catch (Exception e) {
+          discoveryException.set(e);
+        } finally {
+          finishLatch.countDown();
+        }
+      });
+
+      // Thread 2: Query existing table
+      Future<?> query1Task = executor.submit(() -> {
+        try {
+          startLatch.await();
+          for (int i = 0; i < 10; i++) {
+            boolean exists = _routingManager.routingExists(table1);
+            Assert.assertTrue(exists, "Existing table should remain queryable 
during discovery");
+            Thread.sleep(5);
+          }
+        } catch (Exception e) {
+          query1Exception.set(e);
+        } finally {
+          finishLatch.countDown();
+        }
+      });
+
+      // Thread 3: Query both tables
+      Future<?> query2Task = executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(20); // Let discovery happen first

Review Comment:
   same as other comment. this usually leads to flaky tests



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/MultiClusterRoutingManager.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.SegmentsToQuery;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code MultiClusterRoutingManager} implements the {@link 
RoutingManager} to support multi-cluster routing.
+ * It contains a local {@link BrokerRoutingManager} and multiple remote {@link 
RemoteClusterBrokerRoutingManager}
+ * instances. For each routing request, it first queries the local cluster 
routing manager, and then queries the remote
+ * cluster routing managers to combine the results.
+ * For example, when getting the routing table for a table, it first gets the 
routing table from the local cluster
+ * routing manager, and then gets the routing tables from the remote cluster 
routing managers to merge into a combined
+ * routing table.
+ */
+public class MultiClusterRoutingManager implements RoutingManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiClusterRoutingManager.class);
+
+  private final BrokerRoutingManager _localClusterRoutingManager;
+  private final List<RemoteClusterBrokerRoutingManager> 
_remoteClusterRoutingManagers;
+
+  public MultiClusterRoutingManager(BrokerRoutingManager 
localClusterRoutingManager,
+      List<RemoteClusterBrokerRoutingManager> remoteClusterRoutingManagers) {
+    _localClusterRoutingManager = localClusterRoutingManager;
+    _remoteClusterRoutingManagers = remoteClusterRoutingManagers;
+  }
+
+  @Nullable
+  private <T> T findFirst(Function<BrokerRoutingManager, T> getter, String 
tableNameForLog) {
+    T result = getter.apply(_localClusterRoutingManager);
+    if (result != null) {
+      return result;
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        result = getter.apply(remoteCluster);
+        if (result != null) {
+          return result;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error querying remote cluster routing manager for table 
{}", tableNameForLog, e);
+      }
+    }
+    return null;
+  }
+
+  private boolean anyMatch(Predicate<BrokerRoutingManager> predicate) {
+    if (predicate.test(_localClusterRoutingManager)) {
+      return true;
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      if (predicate.test(remoteCluster)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean routingExists(String tableNameWithType) {
+    return anyMatch(mgr -> mgr.routingExists(tableNameWithType));
+  }
+
+  @Override
+  public boolean isTableDisabled(String tableNameWithType) {
+    return anyMatch(mgr -> mgr.isTableDisabled(tableNameWithType));
+  }
+
+  @Nullable
+  @Override
+  public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
+    return getRoutingTable(brokerRequest, 
brokerRequest.getQuerySource().getTableName(), requestId);
+  }
+
+  @Nullable
+  @Override
+  public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId) {
+    RoutingTable localTable = 
_localClusterRoutingManager.getRoutingTable(brokerRequest, tableNameWithType, 
requestId);
+    return combineRoutingTables(localTable, tableNameWithType, brokerRequest, 
requestId);
+  }
+
+  private RoutingTable combineRoutingTables(@Nullable RoutingTable localTable, 
String tableNameWithType,
+      BrokerRequest brokerRequest, long requestId) {
+    Map<ServerInstance, SegmentsToQuery> combinedMap = localTable != null
+        ? new HashMap<>(localTable.getServerInstanceToSegmentsMap()) : new 
HashMap<>();
+    List<String> unavailableSegments = localTable != null
+        ? new ArrayList<>(localTable.getUnavailableSegments()) : new 
ArrayList<>();
+    int prunedCount = localTable != null ? localTable.getNumPrunedSegments() : 
0;
+
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        RoutingTable remoteTable = 
remoteCluster.getRoutingTable(brokerRequest, tableNameWithType, requestId);
+        if (remoteTable != null) {
+          mergeRoutingTable(combinedMap, remoteTable);
+          unavailableSegments.addAll(remoteTable.getUnavailableSegments());
+          prunedCount += remoteTable.getNumPrunedSegments();
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error combining routing table for table {}", 
tableNameWithType, e);
+      }
+    }
+    return combinedMap.isEmpty() && unavailableSegments.isEmpty() ? null
+        : new RoutingTable(combinedMap, unavailableSegments, prunedCount);
+  }
+
+  private void mergeRoutingTable(Map<ServerInstance, SegmentsToQuery> target, 
RoutingTable source) {
+    for (Map.Entry<ServerInstance, SegmentsToQuery> entry : 
source.getServerInstanceToSegmentsMap().entrySet()) {
+      SegmentsToQuery existing = target.get(entry.getKey());
+      if (existing != null) {
+        existing.getSegments().addAll(entry.getValue().getSegments());
+        
existing.getOptionalSegments().addAll(entry.getValue().getOptionalSegments());
+      } else {
+        target.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  @Nullable
+  @Override
+  public TimeBoundaryInfo getTimeBoundaryInfo(String tableNameWithType) {
+    return findFirst(mgr -> mgr.getTimeBoundaryInfo(tableNameWithType), 
tableNameWithType);
+  }
+
+  @Override
+  public Map<String, ServerInstance> getEnabledServerInstanceMap() {
+    Map<String, ServerInstance> combined = new 
HashMap<>(_localClusterRoutingManager.getEnabledServerInstanceMap());
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      combined.putAll(remoteCluster.getEnabledServerInstanceMap());
+    }
+    return combined;
+  }
+
+  @Override
+  public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
+    return findFirst(mgr -> mgr.getTablePartitionInfo(tableNameWithType), 
tableNameWithType);
+  }
+
+  @Override
+  public Set<String> getServingInstances(String tableNameWithType) {
+    Set<String> combined = new HashSet<>();
+    Set<String> localInstances = 
_localClusterRoutingManager.getServingInstances(tableNameWithType);
+    if (localInstances != null) {
+      combined.addAll(localInstances);
+    }
+    for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) {
+      try {
+        Set<String> instances = 
remoteCluster.getServingInstances(tableNameWithType);
+        if (instances != null) {
+          combined.addAll(instances);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error getting serving instances for table {}", 
tableNameWithType, e);

Review Comment:
   note-to-self: currently this method is only used in WorkerManager, which is 
used by MSE for non-physical optimizer queries. And based on the definition, it 
is okay for this method to return a partial list of servers. So this should be 
okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to