ankitsultana commented on code in PR #17296: URL: https://github.com/apache/pinot/pull/17296#discussion_r2629189233
########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/MultiClusterRoutingManagerTest.java: ########## @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.SegmentsToQuery; +import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo; +import org.apache.pinot.core.transport.ServerInstance; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +/** + * Unit tests for {@link MultiClusterRoutingManager}. + */ +public class MultiClusterRoutingManagerTest { + private static final String TEST_TABLE = "testTable_OFFLINE"; + private static final long REQUEST_ID = 12345L; + + @Mock + private BrokerRoutingManager _localClusterRoutingManager; + + @Mock + private RemoteClusterBrokerRoutingManager _remoteClusterRoutingManager1; + + @Mock + private RemoteClusterBrokerRoutingManager _remoteClusterRoutingManager2; + + private MultiClusterRoutingManager _multiClusterRoutingManager; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + + List<RemoteClusterBrokerRoutingManager> remoteClusterManagers = Arrays.asList( + _remoteClusterRoutingManager1, _remoteClusterRoutingManager2); + _multiClusterRoutingManager = new MultiClusterRoutingManager( + _localClusterRoutingManager, remoteClusterManagers); + } + + @Test + public void testRoutingExistsShortCircuits() { + when(_localClusterRoutingManager.routingExists(TEST_TABLE)).thenReturn(true); + + boolean exists = _multiClusterRoutingManager.routingExists(TEST_TABLE); + + assertTrue(exists); + verify(_remoteClusterRoutingManager1, never()).routingExists(anyString()); + } + + @Test + public void testRoutingExistsChecksRemote() { + when(_localClusterRoutingManager.routingExists(TEST_TABLE)).thenReturn(false); + when(_remoteClusterRoutingManager1.routingExists(TEST_TABLE)).thenReturn(false); + when(_remoteClusterRoutingManager2.routingExists(TEST_TABLE)).thenReturn(true); + + boolean exists = _multiClusterRoutingManager.routingExists(TEST_TABLE); + + assertTrue(exists); + } + + @Test + public void testGetRoutingTableCombinesLocalAndRemote() { + BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE); + + RoutingTable localTable = createRoutingTable("localServer", Arrays.asList("seg1")); + RoutingTable remoteTable = createRoutingTable("remoteServer", Arrays.asList("seg2")); + + when(_localClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(localTable); + when(_remoteClusterRoutingManager1.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(remoteTable); + when(_remoteClusterRoutingManager2.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(null); + + RoutingTable result = _multiClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID); + + assertNotNull(result); + assertEquals(result.getServerInstanceToSegmentsMap().size(), 2); + } + + @Test + public void testGetRoutingTableHandlesRemoteException() { + BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE); + RoutingTable localTable = createRoutingTable("localServer", Arrays.asList("seg1")); + + when(_localClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(localTable); + when(_remoteClusterRoutingManager1.getRoutingTable(any(), anyString(), anyLong())) + .thenThrow(new RuntimeException("Remote error")); + when(_remoteClusterRoutingManager2.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(null); + + RoutingTable result = _multiClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID); + + assertNotNull(result); + assertEquals(result.getServerInstanceToSegmentsMap().size(), 1); + } + + @Test + public void testGetRoutingTableReturnsNullWhenAllNull() { + BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE); + when(_localClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(null); + when(_remoteClusterRoutingManager1.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(null); + when(_remoteClusterRoutingManager2.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID)) + .thenReturn(null); + + RoutingTable result = _multiClusterRoutingManager.getRoutingTable(brokerRequest, TEST_TABLE, REQUEST_ID); + + assertNull(result); + } + + @Test + public void testGetTimeBoundaryInfoFindsFirst() { + TimeBoundaryInfo timeBoundaryInfo = mock(TimeBoundaryInfo.class); + when(_localClusterRoutingManager.getTimeBoundaryInfo(TEST_TABLE)).thenReturn(null); + when(_remoteClusterRoutingManager1.getTimeBoundaryInfo(TEST_TABLE)).thenReturn(timeBoundaryInfo); + + TimeBoundaryInfo result = _multiClusterRoutingManager.getTimeBoundaryInfo(TEST_TABLE); + + assertNotNull(result); + assertEquals(result, timeBoundaryInfo); + verify(_remoteClusterRoutingManager2, never()).getTimeBoundaryInfo(anyString()); + } + + @Test + public void testGetServingInstancesCombinesAll() { + Set<String> localInstances = new HashSet<>(Arrays.asList("server1")); + Set<String> remoteInstances = new HashSet<>(Arrays.asList("server2")); + + when(_localClusterRoutingManager.getServingInstances(TEST_TABLE)).thenReturn(localInstances); + when(_remoteClusterRoutingManager1.getServingInstances(TEST_TABLE)).thenReturn(remoteInstances); + when(_remoteClusterRoutingManager2.getServingInstances(TEST_TABLE)).thenReturn(null); + + Set<String> result = _multiClusterRoutingManager.getServingInstances(TEST_TABLE); + + assertNotNull(result); + assertEquals(result.size(), 2); + assertTrue(result.contains("server1")); + assertTrue(result.contains("server2")); + } + + @Test + public void testGetServingInstancesHandlesException() { + Set<String> localInstances = new HashSet<>(Arrays.asList("server1")); + when(_localClusterRoutingManager.getServingInstances(TEST_TABLE)).thenReturn(localInstances); + when(_remoteClusterRoutingManager1.getServingInstances(TEST_TABLE)) + .thenThrow(new RuntimeException("Error")); + + Set<String> result = _multiClusterRoutingManager.getServingInstances(TEST_TABLE); + + assertNotNull(result); + assertEquals(result.size(), 1); + } + + @Test + public void testGetSegmentsHandlesNulls() { + BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE); + List<String> remoteSegments = Arrays.asList("seg1"); + + when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null); + when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(remoteSegments); + when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null); + + List<String> result = _multiClusterRoutingManager.getSegments(brokerRequest); + + assertNotNull(result); + assertEquals(result.size(), 1); + assertTrue(result.contains("seg1")); + } + + @Test + public void testGetSegmentsReturnsNullWhenAllNull() { + BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE); + when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null); + when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(null); + when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null); + + List<String> result = _multiClusterRoutingManager.getSegments(brokerRequest); + + assertNull(result); + } + + @Test + public void testGetEnabledServerInstanceMapCombinesAll() { Review Comment: Interesting: instanceId is guaranteed to be unique only within a cluster. Do we need to think about this? What would happen if there's collision across clusters? Maybe we can add a scope prefix like `cluster-name::instance-id`. ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/RemoteClusterBrokerRoutingManagerTest.java: ########## @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; + +/** + * Test class for {@link RemoteClusterBrokerRoutingManager}. + * Tests the remote cluster routing manager with real ZooKeeper to validate table discovery, + * routing updates, concurrent operations, and lifecycle management. + */ +public class RemoteClusterBrokerRoutingManagerTest extends ControllerTest { + private static final String REMOTE_CLUSTER_NAME = "remoteCluster"; + private static final String RAW_TABLE_NAME = "remoteTable"; + private static final String OFFLINE_TABLE = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); + private static final String SERVER_INSTANCE_1 = "Server_remote_host1_9000"; + private static final String SERVER_INSTANCE_2 = "Server_remote_host2_9000"; + + private BrokerMetrics _brokerMetrics; + private ServerRoutingStatsManager _serverRoutingStatsManager; + private PinotConfiguration _pinotConfig; + private RemoteClusterBrokerRoutingManager _routingManager; + + @BeforeClass + public void setUp() throws Exception { + startZk(); + startController(); + + _brokerMetrics = Mockito.mock(BrokerMetrics.class); + _serverRoutingStatsManager = Mockito.mock(ServerRoutingStatsManager.class); + _pinotConfig = Mockito.mock(PinotConfiguration.class); + + Mockito.when(_pinotConfig.getProperty(Mockito.eq("pinot.broker.adaptive.server.selector.type"))) + .thenReturn("UNIFORM_RANDOM"); + Mockito.when(_pinotConfig.getProperty( + Mockito.eq(CommonConstants.Broker.CONFIG_OF_ROUTING_ASSIGNMENT_CHANGE_PROCESS_PARALLELISM), anyInt())) + .thenReturn(10); + Mockito.when(_pinotConfig.getProperty(anyString(), anyString())) + .thenAnswer(invocation -> invocation.getArgument(1)); + Mockito.when(_pinotConfig.getProperty(anyString(), anyInt())) + .thenAnswer(invocation -> invocation.getArgument(1)); + + _routingManager = new RemoteClusterBrokerRoutingManager( + REMOTE_CLUSTER_NAME, _brokerMetrics, _serverRoutingStatsManager, _pinotConfig); + _routingManager.init(_helixManager); + + addServerInstances(); + triggerInstanceConfigProcessing(); + } + + private void triggerInstanceConfigProcessing() { + try { + _routingManager.processClusterChange(org.apache.helix.HelixConstants.ChangeType.INSTANCE_CONFIG); + } catch (Exception e) { + Assert.fail("Failed to process instance config", e); + } + } + + @AfterClass + public void tearDown() { + if (_routingManager != null) { + _routingManager.shutdown(); + } + stopController(); + stopZk(); + } + + private void addServerInstances() { + String clusterName = getHelixClusterName(); + + for (String serverInstanceId : new String[]{SERVER_INSTANCE_1, SERVER_INSTANCE_2}) { + if (!_helixAdmin.getInstancesInCluster(clusterName).contains(serverInstanceId)) { + InstanceConfig instanceConfig = new InstanceConfig(serverInstanceId); + instanceConfig.setHostName(serverInstanceId.split("_")[1]); + instanceConfig.setPort("9000"); + instanceConfig.setInstanceEnabled(true); + _helixAdmin.addInstance(clusterName, instanceConfig); + _helixAdmin.enableInstance(clusterName, serverInstanceId, true); + } + } + } + + @Test + public void testTableDiscoveryAddsNewTable() throws Exception { + Assert.assertFalse(_routingManager.routingExists(OFFLINE_TABLE), "Table should not exist initially"); + + createTableInZooKeeper(OFFLINE_TABLE, TableType.OFFLINE); + + _routingManager.processSegmentAssignmentChangeInternal(); + Assert.assertTrue(_routingManager.hasRoutingChangeScheduled(), "Flag should be set after change"); + + _routingManager.determineRoutingChangeForTables(); + + Assert.assertTrue(_routingManager.routingExists(OFFLINE_TABLE), "Table should be discovered and added"); + Assert.assertFalse(_routingManager.hasRoutingChangeScheduled(), "Flag should be consumed"); + } + + @Test + public void testTableRemovalDeletesRouting() throws Exception { + String tableToRemove = "tableToRemove_OFFLINE"; + createTableInZooKeeper(tableToRemove, TableType.OFFLINE); + + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + Assert.assertTrue(_routingManager.routingExists(tableToRemove), "Table should exist after discovery"); + + deleteTableFromZooKeeper(tableToRemove); + + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + + Assert.assertFalse(_routingManager.routingExists(tableToRemove), "Table should be removed from routing"); + } + + @Test + public void testConcurrentTableDiscoveryAndQueries() throws Exception { + String table1 = "concurrentTable1_OFFLINE"; + String table2 = "concurrentTable2_OFFLINE"; + + createTableInZooKeeper(table1, TableType.OFFLINE); + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + Assert.assertTrue(_routingManager.routingExists(table1)); + + ExecutorService executor = Executors.newFixedThreadPool(3); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(3); + + AtomicReference<Exception> discoveryException = new AtomicReference<>(); + AtomicReference<Exception> query1Exception = new AtomicReference<>(); + AtomicReference<Exception> query2Exception = new AtomicReference<>(); + + try { + // Thread 1: Discover new table + Future<?> discoveryTask = executor.submit(() -> { + try { + startLatch.await(); + createTableInZooKeeper(table2, TableType.OFFLINE); + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + } catch (Exception e) { + discoveryException.set(e); + } finally { + finishLatch.countDown(); + } + }); + + // Thread 2: Query existing table + Future<?> query1Task = executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < 10; i++) { + boolean exists = _routingManager.routingExists(table1); + Assert.assertTrue(exists, "Existing table should remain queryable during discovery"); + Thread.sleep(5); + } + } catch (Exception e) { + query1Exception.set(e); + } finally { + finishLatch.countDown(); + } + }); + + // Thread 3: Query both tables + Future<?> query2Task = executor.submit(() -> { + try { + startLatch.await(); + Thread.sleep(20); // Let discovery happen first + for (int i = 0; i < 5; i++) { + _routingManager.routingExists(table1); + _routingManager.routingExists(table2); + Thread.sleep(10); + } + } catch (Exception e) { + query2Exception.set(e); + } finally { + finishLatch.countDown(); + } + }); + + startLatch.countDown(); + Assert.assertTrue(finishLatch.await(10, TimeUnit.SECONDS), "Tasks should complete"); + + if (discoveryException.get() != null) { + Assert.fail("Discovery failed", discoveryException.get()); + } + if (query1Exception.get() != null) { + Assert.fail("Query 1 failed", query1Exception.get()); + } + if (query2Exception.get() != null) { + Assert.fail("Query 2 failed", query2Exception.get()); + } + + Assert.assertTrue(_routingManager.routingExists(table1), "Table 1 should exist"); + Assert.assertTrue(_routingManager.routingExists(table2), "Table 2 should be discovered"); + } finally { + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testMultipleTableDiscoveryInParallel() throws Exception { + String[] tables = { + "parallelTable1_OFFLINE", + "parallelTable2_OFFLINE", + "parallelTable3_OFFLINE" + }; + + ExecutorService executor = Executors.newFixedThreadPool(3); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(3); + + AtomicReference<Exception> exception = new AtomicReference<>(); + + try { + for (int i = 0; i < tables.length; i++) { + final String tableName = tables[i]; + executor.submit(() -> { + try { + startLatch.await(); + createTableInZooKeeper(tableName, TableType.OFFLINE); + } catch (Exception e) { + exception.set(e); + } finally { + finishLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue(finishLatch.await(10, TimeUnit.SECONDS), "Table creation should complete"); + + if (exception.get() != null) { + Assert.fail("Table creation failed", exception.get()); + } + + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + + for (String tableName : tables) { + Assert.assertTrue(_routingManager.routingExists(tableName), + "Table " + tableName + " should be discovered"); + } + } finally { + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testRepeatedDiscoveryCalls() throws Exception { + String testTable = "repeatedTable_OFFLINE"; + createTableInZooKeeper(testTable, TableType.OFFLINE); + + for (int i = 0; i < 5; i++) { + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + } + + Assert.assertTrue(_routingManager.routingExists(testTable), + "Table should be discovered after repeated calls"); + } + + @Test + public void testShutdownDuringDiscovery() throws Exception { + String testTable = "shutdownTable_OFFLINE"; + createTableInZooKeeper(testTable, TableType.OFFLINE); + + ExecutorService executor = Executors.newFixedThreadPool(2); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(2); + + AtomicReference<Exception> discoveryException = new AtomicReference<>(); + AtomicReference<Exception> shutdownException = new AtomicReference<>(); + + try { + // Thread 1: Start discovery + Future<?> discoveryTask = executor.submit(() -> { + try { + startLatch.await(); + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + } catch (Exception e) { + discoveryException.set(e); + } finally { + finishLatch.countDown(); + } + }); + + // Thread 2: Shutdown + Future<?> shutdownTask = executor.submit(() -> { + try { + startLatch.await(); + Thread.sleep(10); Review Comment: what's the role of sleep here? can we avoid it? (easy to run into flaky tests) ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/MultiClusterRoutingManager.java: ########## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.SegmentsToQuery; +import org.apache.pinot.core.routing.TablePartitionInfo; +import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo; +import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo; +import org.apache.pinot.core.transport.ServerInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The {@code MultiClusterRoutingManager} implements the {@link RoutingManager} to support multi-cluster routing. + * It contains a local {@link BrokerRoutingManager} and multiple remote {@link RemoteClusterBrokerRoutingManager} + * instances. For each routing request, it first queries the local cluster routing manager, and then queries the remote + * cluster routing managers to combine the results. + * For example, when getting the routing table for a table, it first gets the routing table from the local cluster + * routing manager, and then gets the routing tables from the remote cluster routing managers to merge into a combined + * routing table. + */ +public class MultiClusterRoutingManager implements RoutingManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiClusterRoutingManager.class); + + private final BrokerRoutingManager _localClusterRoutingManager; + private final List<RemoteClusterBrokerRoutingManager> _remoteClusterRoutingManagers; + + public MultiClusterRoutingManager(BrokerRoutingManager localClusterRoutingManager, + List<RemoteClusterBrokerRoutingManager> remoteClusterRoutingManagers) { + _localClusterRoutingManager = localClusterRoutingManager; + _remoteClusterRoutingManagers = remoteClusterRoutingManagers; + } + + @Nullable + private <T> T findFirst(Function<BrokerRoutingManager, T> getter, String tableNameForLog) { + T result = getter.apply(_localClusterRoutingManager); + if (result != null) { + return result; + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + result = getter.apply(remoteCluster); + if (result != null) { + return result; + } + } catch (Exception e) { + LOGGER.error("Error querying remote cluster routing manager for table {}", tableNameForLog, e); + } + } + return null; + } + + private boolean anyMatch(Predicate<BrokerRoutingManager> predicate) { + if (predicate.test(_localClusterRoutingManager)) { + return true; + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + if (predicate.test(remoteCluster)) { + return true; + } + } + return false; + } + + @Override + public boolean routingExists(String tableNameWithType) { + return anyMatch(mgr -> mgr.routingExists(tableNameWithType)); + } + + @Override + public boolean isTableDisabled(String tableNameWithType) { + return anyMatch(mgr -> mgr.isTableDisabled(tableNameWithType)); + } + + @Nullable + @Override + public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) { + return getRoutingTable(brokerRequest, brokerRequest.getQuerySource().getTableName(), requestId); + } + + @Nullable + @Override + public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String tableNameWithType, long requestId) { + RoutingTable localTable = _localClusterRoutingManager.getRoutingTable(brokerRequest, tableNameWithType, requestId); + return combineRoutingTables(localTable, tableNameWithType, brokerRequest, requestId); + } + + private RoutingTable combineRoutingTables(@Nullable RoutingTable localTable, String tableNameWithType, + BrokerRequest brokerRequest, long requestId) { + Map<ServerInstance, SegmentsToQuery> combinedMap = localTable != null + ? new HashMap<>(localTable.getServerInstanceToSegmentsMap()) : new HashMap<>(); + List<String> unavailableSegments = localTable != null + ? new ArrayList<>(localTable.getUnavailableSegments()) : new ArrayList<>(); + int prunedCount = localTable != null ? localTable.getNumPrunedSegments() : 0; + + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + RoutingTable remoteTable = remoteCluster.getRoutingTable(brokerRequest, tableNameWithType, requestId); + if (remoteTable != null) { + mergeRoutingTable(combinedMap, remoteTable); + unavailableSegments.addAll(remoteTable.getUnavailableSegments()); + prunedCount += remoteTable.getNumPrunedSegments(); + } + } catch (Exception e) { + LOGGER.error("Error combining routing table for table {}", tableNameWithType, e); + } + } + return combinedMap.isEmpty() && unavailableSegments.isEmpty() ? null + : new RoutingTable(combinedMap, unavailableSegments, prunedCount); + } + + private void mergeRoutingTable(Map<ServerInstance, SegmentsToQuery> target, RoutingTable source) { + for (Map.Entry<ServerInstance, SegmentsToQuery> entry : source.getServerInstanceToSegmentsMap().entrySet()) { + SegmentsToQuery existing = target.get(entry.getKey()); + if (existing != null) { + existing.getSegments().addAll(entry.getValue().getSegments()); + existing.getOptionalSegments().addAll(entry.getValue().getOptionalSegments()); + } else { + target.put(entry.getKey(), entry.getValue()); + } + } + } + + @Nullable + @Override + public TimeBoundaryInfo getTimeBoundaryInfo(String tableNameWithType) { + return findFirst(mgr -> mgr.getTimeBoundaryInfo(tableNameWithType), tableNameWithType); + } + + @Override + public Map<String, ServerInstance> getEnabledServerInstanceMap() { + Map<String, ServerInstance> combined = new HashMap<>(_localClusterRoutingManager.getEnabledServerInstanceMap()); + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + combined.putAll(remoteCluster.getEnabledServerInstanceMap()); + } + return combined; + } + + @Override + public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) { + return findFirst(mgr -> mgr.getTablePartitionInfo(tableNameWithType), tableNameWithType); + } + + @Override + public Set<String> getServingInstances(String tableNameWithType) { + Set<String> combined = new HashSet<>(); + Set<String> localInstances = _localClusterRoutingManager.getServingInstances(tableNameWithType); + if (localInstances != null) { + combined.addAll(localInstances); + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + Set<String> instances = remoteCluster.getServingInstances(tableNameWithType); + if (instances != null) { + combined.addAll(instances); + } + } catch (Exception e) { + LOGGER.error("Error getting serving instances for table {}", tableNameWithType, e); + } + } + return combined.isEmpty() ? null : combined; + } + + @Override + public List<String> getSegments(BrokerRequest brokerRequest) { + List<String> combined = new ArrayList<>(); + List<String> localSegments = _localClusterRoutingManager.getSegments(brokerRequest); + if (localSegments != null) { + combined.addAll(localSegments); + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + List<String> remoteSegments = remoteCluster.getSegments(brokerRequest); + if (remoteSegments != null) { + combined.addAll(remoteSegments); + } + } catch (Exception e) { + LOGGER.error("Error getting segments from remote cluster routing manager", e); Review Comment: note-to-self: same note as above ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/RemoteClusterBrokerRoutingManagerTest.java: ########## @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; + +/** + * Test class for {@link RemoteClusterBrokerRoutingManager}. + * Tests the remote cluster routing manager with real ZooKeeper to validate table discovery, + * routing updates, concurrent operations, and lifecycle management. + */ +public class RemoteClusterBrokerRoutingManagerTest extends ControllerTest { + private static final String REMOTE_CLUSTER_NAME = "remoteCluster"; + private static final String RAW_TABLE_NAME = "remoteTable"; + private static final String OFFLINE_TABLE = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); + private static final String SERVER_INSTANCE_1 = "Server_remote_host1_9000"; + private static final String SERVER_INSTANCE_2 = "Server_remote_host2_9000"; + + private BrokerMetrics _brokerMetrics; + private ServerRoutingStatsManager _serverRoutingStatsManager; + private PinotConfiguration _pinotConfig; + private RemoteClusterBrokerRoutingManager _routingManager; + + @BeforeClass + public void setUp() throws Exception { + startZk(); + startController(); + + _brokerMetrics = Mockito.mock(BrokerMetrics.class); + _serverRoutingStatsManager = Mockito.mock(ServerRoutingStatsManager.class); + _pinotConfig = Mockito.mock(PinotConfiguration.class); + + Mockito.when(_pinotConfig.getProperty(Mockito.eq("pinot.broker.adaptive.server.selector.type"))) + .thenReturn("UNIFORM_RANDOM"); + Mockito.when(_pinotConfig.getProperty( + Mockito.eq(CommonConstants.Broker.CONFIG_OF_ROUTING_ASSIGNMENT_CHANGE_PROCESS_PARALLELISM), anyInt())) + .thenReturn(10); + Mockito.when(_pinotConfig.getProperty(anyString(), anyString())) + .thenAnswer(invocation -> invocation.getArgument(1)); + Mockito.when(_pinotConfig.getProperty(anyString(), anyInt())) + .thenAnswer(invocation -> invocation.getArgument(1)); + + _routingManager = new RemoteClusterBrokerRoutingManager( + REMOTE_CLUSTER_NAME, _brokerMetrics, _serverRoutingStatsManager, _pinotConfig); + _routingManager.init(_helixManager); + + addServerInstances(); + triggerInstanceConfigProcessing(); + } + + private void triggerInstanceConfigProcessing() { + try { + _routingManager.processClusterChange(org.apache.helix.HelixConstants.ChangeType.INSTANCE_CONFIG); + } catch (Exception e) { + Assert.fail("Failed to process instance config", e); + } + } + + @AfterClass + public void tearDown() { + if (_routingManager != null) { + _routingManager.shutdown(); + } + stopController(); + stopZk(); + } + + private void addServerInstances() { + String clusterName = getHelixClusterName(); + + for (String serverInstanceId : new String[]{SERVER_INSTANCE_1, SERVER_INSTANCE_2}) { + if (!_helixAdmin.getInstancesInCluster(clusterName).contains(serverInstanceId)) { + InstanceConfig instanceConfig = new InstanceConfig(serverInstanceId); + instanceConfig.setHostName(serverInstanceId.split("_")[1]); + instanceConfig.setPort("9000"); + instanceConfig.setInstanceEnabled(true); + _helixAdmin.addInstance(clusterName, instanceConfig); + _helixAdmin.enableInstance(clusterName, serverInstanceId, true); + } + } + } + + @Test + public void testTableDiscoveryAddsNewTable() throws Exception { + Assert.assertFalse(_routingManager.routingExists(OFFLINE_TABLE), "Table should not exist initially"); + + createTableInZooKeeper(OFFLINE_TABLE, TableType.OFFLINE); + + _routingManager.processSegmentAssignmentChangeInternal(); + Assert.assertTrue(_routingManager.hasRoutingChangeScheduled(), "Flag should be set after change"); + + _routingManager.determineRoutingChangeForTables(); + + Assert.assertTrue(_routingManager.routingExists(OFFLINE_TABLE), "Table should be discovered and added"); + Assert.assertFalse(_routingManager.hasRoutingChangeScheduled(), "Flag should be consumed"); + } + + @Test + public void testTableRemovalDeletesRouting() throws Exception { + String tableToRemove = "tableToRemove_OFFLINE"; + createTableInZooKeeper(tableToRemove, TableType.OFFLINE); + + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + Assert.assertTrue(_routingManager.routingExists(tableToRemove), "Table should exist after discovery"); + + deleteTableFromZooKeeper(tableToRemove); + + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + + Assert.assertFalse(_routingManager.routingExists(tableToRemove), "Table should be removed from routing"); + } + + @Test + public void testConcurrentTableDiscoveryAndQueries() throws Exception { + String table1 = "concurrentTable1_OFFLINE"; + String table2 = "concurrentTable2_OFFLINE"; + + createTableInZooKeeper(table1, TableType.OFFLINE); + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + Assert.assertTrue(_routingManager.routingExists(table1)); + + ExecutorService executor = Executors.newFixedThreadPool(3); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(3); + + AtomicReference<Exception> discoveryException = new AtomicReference<>(); + AtomicReference<Exception> query1Exception = new AtomicReference<>(); + AtomicReference<Exception> query2Exception = new AtomicReference<>(); + + try { + // Thread 1: Discover new table + Future<?> discoveryTask = executor.submit(() -> { + try { + startLatch.await(); + createTableInZooKeeper(table2, TableType.OFFLINE); + _routingManager.processSegmentAssignmentChangeInternal(); + _routingManager.determineRoutingChangeForTables(); + } catch (Exception e) { + discoveryException.set(e); + } finally { + finishLatch.countDown(); + } + }); + + // Thread 2: Query existing table + Future<?> query1Task = executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < 10; i++) { + boolean exists = _routingManager.routingExists(table1); + Assert.assertTrue(exists, "Existing table should remain queryable during discovery"); + Thread.sleep(5); + } + } catch (Exception e) { + query1Exception.set(e); + } finally { + finishLatch.countDown(); + } + }); + + // Thread 3: Query both tables + Future<?> query2Task = executor.submit(() -> { + try { + startLatch.await(); + Thread.sleep(20); // Let discovery happen first Review Comment: same as other comment. this usually leads to flaky tests ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/MultiClusterRoutingManager.java: ########## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.SegmentsToQuery; +import org.apache.pinot.core.routing.TablePartitionInfo; +import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo; +import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo; +import org.apache.pinot.core.transport.ServerInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The {@code MultiClusterRoutingManager} implements the {@link RoutingManager} to support multi-cluster routing. + * It contains a local {@link BrokerRoutingManager} and multiple remote {@link RemoteClusterBrokerRoutingManager} + * instances. For each routing request, it first queries the local cluster routing manager, and then queries the remote + * cluster routing managers to combine the results. + * For example, when getting the routing table for a table, it first gets the routing table from the local cluster + * routing manager, and then gets the routing tables from the remote cluster routing managers to merge into a combined + * routing table. + */ +public class MultiClusterRoutingManager implements RoutingManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiClusterRoutingManager.class); + + private final BrokerRoutingManager _localClusterRoutingManager; + private final List<RemoteClusterBrokerRoutingManager> _remoteClusterRoutingManagers; + + public MultiClusterRoutingManager(BrokerRoutingManager localClusterRoutingManager, + List<RemoteClusterBrokerRoutingManager> remoteClusterRoutingManagers) { + _localClusterRoutingManager = localClusterRoutingManager; + _remoteClusterRoutingManagers = remoteClusterRoutingManagers; + } + + @Nullable + private <T> T findFirst(Function<BrokerRoutingManager, T> getter, String tableNameForLog) { + T result = getter.apply(_localClusterRoutingManager); + if (result != null) { + return result; + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + result = getter.apply(remoteCluster); + if (result != null) { + return result; + } + } catch (Exception e) { + LOGGER.error("Error querying remote cluster routing manager for table {}", tableNameForLog, e); + } + } + return null; + } + + private boolean anyMatch(Predicate<BrokerRoutingManager> predicate) { + if (predicate.test(_localClusterRoutingManager)) { + return true; + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + if (predicate.test(remoteCluster)) { + return true; + } + } + return false; + } + + @Override + public boolean routingExists(String tableNameWithType) { + return anyMatch(mgr -> mgr.routingExists(tableNameWithType)); + } + + @Override + public boolean isTableDisabled(String tableNameWithType) { + return anyMatch(mgr -> mgr.isTableDisabled(tableNameWithType)); + } + + @Nullable + @Override + public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) { + return getRoutingTable(brokerRequest, brokerRequest.getQuerySource().getTableName(), requestId); + } + + @Nullable + @Override + public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String tableNameWithType, long requestId) { + RoutingTable localTable = _localClusterRoutingManager.getRoutingTable(brokerRequest, tableNameWithType, requestId); + return combineRoutingTables(localTable, tableNameWithType, brokerRequest, requestId); + } + + private RoutingTable combineRoutingTables(@Nullable RoutingTable localTable, String tableNameWithType, + BrokerRequest brokerRequest, long requestId) { + Map<ServerInstance, SegmentsToQuery> combinedMap = localTable != null + ? new HashMap<>(localTable.getServerInstanceToSegmentsMap()) : new HashMap<>(); + List<String> unavailableSegments = localTable != null + ? new ArrayList<>(localTable.getUnavailableSegments()) : new ArrayList<>(); + int prunedCount = localTable != null ? localTable.getNumPrunedSegments() : 0; + + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + RoutingTable remoteTable = remoteCluster.getRoutingTable(brokerRequest, tableNameWithType, requestId); + if (remoteTable != null) { + mergeRoutingTable(combinedMap, remoteTable); + unavailableSegments.addAll(remoteTable.getUnavailableSegments()); + prunedCount += remoteTable.getNumPrunedSegments(); + } + } catch (Exception e) { + LOGGER.error("Error combining routing table for table {}", tableNameWithType, e); + } + } + return combinedMap.isEmpty() && unavailableSegments.isEmpty() ? null + : new RoutingTable(combinedMap, unavailableSegments, prunedCount); + } + + private void mergeRoutingTable(Map<ServerInstance, SegmentsToQuery> target, RoutingTable source) { + for (Map.Entry<ServerInstance, SegmentsToQuery> entry : source.getServerInstanceToSegmentsMap().entrySet()) { + SegmentsToQuery existing = target.get(entry.getKey()); + if (existing != null) { + existing.getSegments().addAll(entry.getValue().getSegments()); + existing.getOptionalSegments().addAll(entry.getValue().getOptionalSegments()); + } else { + target.put(entry.getKey(), entry.getValue()); + } + } + } + + @Nullable + @Override + public TimeBoundaryInfo getTimeBoundaryInfo(String tableNameWithType) { + return findFirst(mgr -> mgr.getTimeBoundaryInfo(tableNameWithType), tableNameWithType); + } + + @Override + public Map<String, ServerInstance> getEnabledServerInstanceMap() { + Map<String, ServerInstance> combined = new HashMap<>(_localClusterRoutingManager.getEnabledServerInstanceMap()); + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + combined.putAll(remoteCluster.getEnabledServerInstanceMap()); + } + return combined; + } + + @Override + public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) { + return findFirst(mgr -> mgr.getTablePartitionInfo(tableNameWithType), tableNameWithType); + } + + @Override + public Set<String> getServingInstances(String tableNameWithType) { + Set<String> combined = new HashSet<>(); + Set<String> localInstances = _localClusterRoutingManager.getServingInstances(tableNameWithType); + if (localInstances != null) { + combined.addAll(localInstances); + } + for (BrokerRoutingManager remoteCluster : _remoteClusterRoutingManagers) { + try { + Set<String> instances = remoteCluster.getServingInstances(tableNameWithType); + if (instances != null) { + combined.addAll(instances); + } + } catch (Exception e) { + LOGGER.error("Error getting serving instances for table {}", tableNameWithType, e); Review Comment: note-to-self: currently this method is only used in WorkerManager, which is used by MSE for non-physical optimizer queries. And based on the definition, it is okay for this method to return a partial list of servers. So this should be okay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
