This is an automated email from the ASF dual-hosted git repository. shauryachats pushed a commit to branch pr_18172 in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 1b3cdbb3c271fae04dc36439f2e1d20299436541 Author: shauryachats <[email protected]> AuthorDate: Tue May 12 02:59:03 2026 +0000 Add SQL-based multi-cluster routing table debug endpoint and reduce diff - Add `useMultiClusterRouting` query param to `/debug/routingTable/sql` and `/debug/routingTableWithOptionalSegments/sql`: when true, extracts the logical table name from the compiled SQL, looks up its physical table config, and returns per-physical-table routing via the multi-cluster routing manager - Change SQL endpoint return type to `Map<String, Map<ServerInstance, List<String>>>` for consistency with the table-name endpoints (single-entry map for non-multi-cluster case) - Extract `collectRoutingTables` helper to eliminate the if/else duplication in both table-name endpoints; rename private helper to `lookupLocalRoutingTable` to avoid shadowing the public JAX-RS endpoint method name - Add class-level Javadoc to PinotBrokerDebugTest; fix pre-existing `assertTrue(x != null)` → `assertNotNull(x)` - Add unit tests: `testSqlEndpointNonMultiClusterWrapsResultInMap` and `testSqlMultiClusterRoutingSucceedsForLogicalTable` - Add integration test `testMultiClusterRoutingTableDebugEndpoint` and `testMultiClusterSqlRoutingTableDebugEndpoint` to MultiClusterIntegrationTest Co-Authored-By: Claude Sonnet 4.6 (1M context) <[email protected]> --- .../broker/api/resources/PinotBrokerDebug.java | 75 ++++++++++++------- .../broker/api/resources/PinotBrokerDebugTest.java | 84 +++++++++++++++++++++- .../multicluster/MultiClusterIntegrationTest.java | 36 ++++++++++ 3 files changed, 169 insertions(+), 26 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index c53a8bf93e6..260bf6305b7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -159,13 +159,8 @@ public class PinotBrokerDebug { tableName = DatabaseUtils.translateTableName(tableName, headers); RoutingManager routingManager = resolveRoutingManager(useMultiClusterRouting, tableName); Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>(); - if (useMultiClusterRouting) { - getPhysicalRoutingTablesForLogical(routingManager, tableName, (tableNameWithType, routingTable) -> result.put( - tableNameWithType, removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()))); - } else { - getRoutingTable(routingManager, tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, - removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()))); - } + collectRoutingTables(routingManager, useMultiClusterRouting, tableName, (tableNameWithType, routingTable) -> + result.put(tableNameWithType, removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()))); if (!result.isEmpty()) { return result; } else { @@ -192,14 +187,8 @@ public class PinotBrokerDebug { tableName = DatabaseUtils.translateTableName(tableName, headers); RoutingManager routingManager = resolveRoutingManager(useMultiClusterRouting, tableName); Map<String, Map<ServerInstance, SegmentsToQuery>> result = new TreeMap<>(); - if (useMultiClusterRouting) { - getPhysicalRoutingTablesForLogical(routingManager, tableName, - (tableNameWithType, routingTable) -> result.put(tableNameWithType, - routingTable.getServerInstanceToSegmentsMap())); - } else { - getRoutingTable(routingManager, tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, - routingTable.getServerInstanceToSegmentsMap())); - } + collectRoutingTables(routingManager, useMultiClusterRouting, tableName, (tableNameWithType, routingTable) -> + result.put(tableNameWithType, routingTable.getServerInstanceToSegmentsMap())); if (!result.isEmpty()) { return result; } else { @@ -207,6 +196,16 @@ public class PinotBrokerDebug { } } + /** Dispatches to {@link #getPhysicalRoutingTablesForLogical} or {@link #lookupLocalRoutingTable} based on flag. */ + private void collectRoutingTables(RoutingManager routingManager, boolean useMultiClusterRouting, + String tableName, BiConsumer<String, RoutingTable> consumer) { + if (useMultiClusterRouting) { + getPhysicalRoutingTablesForLogical(routingManager, tableName, consumer); + } else { + lookupLocalRoutingTable(routingManager, tableName, consumer); + } + } + /** * For a logical table with multi-cluster routing, iterates over every physical table in the logical table config * and invokes the consumer with the per-physical-table routing result. This is needed because the underlying @@ -233,7 +232,7 @@ public class PinotBrokerDebug { } } - private void getRoutingTable(RoutingManager routingManager, String tableName, + private void lookupLocalRoutingTable(RoutingManager routingManager, String tableName, BiConsumer<String, RoutingTable> consumer) { // Use a single requestId for both OFFLINE and REALTIME routing so that replica-group selection rotates properly // for raw table names (no suffix) and stays consistent for hybrid tables. @@ -299,17 +298,31 @@ public class PinotBrokerDebug { @ApiOperation(value = "Get the routing table for a query") @ApiResponses(value = { @ApiResponse(code = 200, message = "Routing table"), + @ApiResponse(code = 400, message = "Bad request"), @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map<ServerInstance, List<String>> getRoutingTableForQuery( + public Map<String, Map<ServerInstance, List<String>>> getRoutingTableForQuery( @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, + @ApiParam(value = "Use multi-cluster routing manager instead of local") + @QueryParam("useMultiClusterRouting") boolean useMultiClusterRouting, @Context HttpHeaders httpHeaders) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); checkAccessControl(brokerRequest, httpHeaders); - RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); - if (routingTable != null) { - return removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()); + String tableName = brokerRequest.getQuerySource().getTableName(); + Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>(); + if (useMultiClusterRouting) { + RoutingManager routingManager = resolveRoutingManager(true, tableName); + getPhysicalRoutingTablesForLogical(routingManager, tableName, (tableNameWithType, routingTable) -> + result.put(tableNameWithType, removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()))); + } else { + RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); + if (routingTable != null) { + result.put(tableName, removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap())); + } + } + if (!result.isEmpty()) { + return result; } else { throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND); } @@ -322,17 +335,31 @@ public class PinotBrokerDebug { @ApiOperation(value = "Get the routing table for a query, including optional segments") @ApiResponses(value = { @ApiResponse(code = 200, message = "Routing table"), + @ApiResponse(code = 400, message = "Bad request"), @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map<ServerInstance, SegmentsToQuery> getRoutingTableForQueryWithOptionalSegments( + public Map<String, Map<ServerInstance, SegmentsToQuery>> getRoutingTableForQueryWithOptionalSegments( @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, + @ApiParam(value = "Use multi-cluster routing manager instead of local") + @QueryParam("useMultiClusterRouting") boolean useMultiClusterRouting, @Context HttpHeaders httpHeaders) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); checkAccessControl(brokerRequest, httpHeaders); - RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); - if (routingTable != null) { - return routingTable.getServerInstanceToSegmentsMap(); + String tableName = brokerRequest.getQuerySource().getTableName(); + Map<String, Map<ServerInstance, SegmentsToQuery>> result = new TreeMap<>(); + if (useMultiClusterRouting) { + RoutingManager routingManager = resolveRoutingManager(true, tableName); + getPhysicalRoutingTablesForLogical(routingManager, tableName, (tableNameWithType, routingTable) -> + result.put(tableNameWithType, routingTable.getServerInstanceToSegmentsMap())); + } else { + RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); + if (routingTable != null) { + result.put(tableName, routingTable.getServerInstanceToSegmentsMap()); + } + } + if (!result.isEmpty()) { + return result; } else { throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND); } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java index 42cc043e3f5..ba93a4a613b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotBrokerDebugTest.java @@ -24,12 +24,15 @@ import java.util.List; import java.util.Map; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.HttpHeaders; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.broker.MultiClusterRoutingContextProvider; import org.apache.pinot.broker.routing.manager.BrokerRoutingManager; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.MultiClusterRoutingContext; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; import org.mockito.ArgumentCaptor; @@ -42,10 +45,15 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; +/** + * Unit tests for {@link PinotBrokerDebug} covering request-ID consistency, multi-cluster routing dispatch, + * and the SQL-based routing endpoints (single-table and logical-table expansion). + */ public class PinotBrokerDebugTest { private PinotBrokerDebug createBrokerDebug(BrokerRoutingManager routingManager) @@ -121,8 +129,8 @@ public class PinotBrokerDebugTest { } } - assertTrue(firstRealtimeRequestId != null); - assertTrue(secondRealtimeRequestId != null); + assertNotNull(firstRealtimeRequestId); + assertNotNull(secondRealtimeRequestId); assertEquals((long) secondRealtimeRequestId, firstRealtimeRequestId + 1); } @@ -195,4 +203,76 @@ public class PinotBrokerDebugTest { verify(multiClusterManager, times(2)).getRoutingTable(any(BrokerRequest.class), anyLong()); verify(routingManager, times(0)).getRoutingTable(any(BrokerRequest.class), anyLong()); } + + /** Creates a PinotBrokerDebug with a permissive access control factory for SQL endpoint tests. */ + private PinotBrokerDebug createBrokerDebugWithAccessControl(BrokerRoutingManager routingManager, + MultiClusterRoutingContext multiClusterContext, TableCache tableCache) + throws Exception { + AccessControl accessControl = mock(AccessControl.class); + when(accessControl.hasAccess(any(), any(), any(), any())).thenReturn(true); + AccessControlFactory accessControlFactory = mock(AccessControlFactory.class); + when(accessControlFactory.create()).thenReturn(accessControl); + + PinotBrokerDebug brokerDebug = new PinotBrokerDebug(); + setField(brokerDebug, "_routingManager", routingManager); + setField(brokerDebug, "_multiClusterRoutingContextProvider", + new MultiClusterRoutingContextProvider(multiClusterContext)); + setField(brokerDebug, "_tableCache", tableCache); + setField(brokerDebug, "_accessControlFactory", accessControlFactory); + return brokerDebug; + } + + @Test + public void testSqlEndpointNonMultiClusterWrapsResultInMap() + throws Exception { + BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class); + when(routingManager.getRoutingTable(any(BrokerRequest.class), anyLong())) + .thenReturn(new RoutingTable(Collections.emptyMap(), Collections.emptyList(), 0)); + + PinotBrokerDebug brokerDebug = + createBrokerDebugWithAccessControl(routingManager, null, mock(TableCache.class)); + + Map<String, Map<ServerInstance, List<String>>> result = + brokerDebug.getRoutingTableForQuery("SELECT * FROM myTable_OFFLINE", false, (HttpHeaders) null); + + // Non-multi-cluster: single entry keyed by the table name from the SQL + assertEquals(result.size(), 1); + assertTrue(result.containsKey("myTable_OFFLINE")); + } + + @Test + public void testSqlMultiClusterRoutingSucceedsForLogicalTable() + throws Exception { + BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class); + BrokerRoutingManager multiClusterManager = mock(BrokerRoutingManager.class); + when(multiClusterManager.getRoutingTable(any(BrokerRequest.class), anyLong())) + .thenReturn(new RoutingTable(Collections.emptyMap(), Collections.emptyList(), 0)); + + MultiClusterRoutingContext context = + new MultiClusterRoutingContext(Collections.emptyMap(), routingManager, multiClusterManager, + Collections.emptySet()); + + LogicalTableConfig logicalTableConfig = new LogicalTableConfig(); + logicalTableConfig.setTableName("logicalTable"); + logicalTableConfig.setPhysicalTableConfigMap(Map.of( + "physicalTable1_OFFLINE", new PhysicalTableConfig(false), + "physicalTable2_OFFLINE", new PhysicalTableConfig(true))); + + TableCache tableCache = mock(TableCache.class); + when(tableCache.isLogicalTable("logicalTable")).thenReturn(true); + when(tableCache.getLogicalTableConfig("logicalTable")).thenReturn(logicalTableConfig); + + PinotBrokerDebug brokerDebug = createBrokerDebugWithAccessControl(routingManager, context, tableCache); + + Map<String, Map<ServerInstance, List<String>>> result = + brokerDebug.getRoutingTableForQuery("SELECT * FROM logicalTable_OFFLINE", true, (HttpHeaders) null); + + // Multi-cluster: one entry per physical table in the logical table config + assertEquals(result.size(), 2); + assertTrue(result.containsKey("physicalTable1_OFFLINE")); + assertTrue(result.containsKey("physicalTable2_OFFLINE")); + // The local routing manager is never touched for the multi-cluster SQL case + verify(routingManager, times(0)).getRoutingTable(any(BrokerRequest.class), anyLong()); + verify(multiClusterManager, times(2)).getRoutingTable(any(BrokerRequest.class), anyLong()); + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java index f07dabaf408..19a836bb15f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java @@ -21,6 +21,8 @@ package org.apache.pinot.integration.tests.multicluster; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.json.JsonMapper; import java.io.IOException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.spi.data.PhysicalTableConfig; @@ -135,6 +137,40 @@ public class MultiClusterIntegrationTest extends BaseMultiClusterIntegrationTest "Cluster 2 physical table should have at least one server"); } + @Test(groups = "query") + public void testMultiClusterSqlRoutingTableDebugEndpoint() throws Exception { + String logicalTable = getLogicalTableName(); + String brokerBase = "http://localhost:" + _cluster1._brokerPort; + + // SQL-based local routing (no multi-cluster flag): result is keyed by the table name from the SQL + String physicalTable = getPhysicalTable1InCluster1(); + String localSql = URLEncoder.encode("SELECT * FROM " + physicalTable + "_OFFLINE", StandardCharsets.UTF_8); + String localRouting = ControllerTest.sendGetRequest(brokerBase + "/debug/routingTable/sql?query=" + localSql); + JsonMapper mapper = JsonMapper.builder().build(); + JsonNode localJson = mapper.readTree(localRouting); + String physicalOfflineKey = physicalTable + "_OFFLINE"; + assertTrue(localJson.has(physicalOfflineKey), + "Local SQL routing should include physical offline table: " + localRouting); + + // SQL-based multi-cluster routing for a logical table: expands to physical tables + String multiSql = + URLEncoder.encode("SELECT * FROM " + logicalTable + "_OFFLINE", StandardCharsets.UTF_8); + String multiRouting = ControllerTest.sendGetRequest( + brokerBase + "/debug/routingTable/sql?query=" + multiSql + "&useMultiClusterRouting=true"); + JsonNode multiJson = mapper.readTree(multiRouting); + + String phys1OfflineKey = getPhysicalTable1InCluster1() + "_OFFLINE"; + String phys2OfflineKey = getPhysicalTable1InCluster2() + "_OFFLINE"; + assertTrue(multiJson.has(phys1OfflineKey), + "Multi-cluster SQL routing should include physical table from cluster 1: " + multiRouting); + assertTrue(multiJson.has(phys2OfflineKey), + "Multi-cluster SQL routing should include physical table from cluster 2: " + multiRouting); + assertTrue(multiJson.get(phys1OfflineKey).size() >= 1, + "Cluster 1 physical table should have at least one server"); + assertTrue(multiJson.get(phys2OfflineKey).size() >= 1, + "Cluster 2 physical table should have at least one server"); + } + @BeforeGroups("query") public void setupTablesForQueryTests() throws Exception { dropLogicalTableIfExists(getLogicalTableName(), _cluster1._controllerBaseApiUrl); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
