This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 16ae07661b Handle remove build routing for logical tables. (#15862) 16ae07661b is described below commit 16ae07661b04672c7ed7e44dcbf5ca185613ef76 Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Mon May 26 19:11:21 2025 +0530 Handle remove build routing for logical tables. (#15862) --- .../broker/api/resources/PinotBrokerRouting.java | 21 ++++++++-- ...okerResourceOnlineOfflineStateModelFactory.java | 30 +++++++++----- .../pinot/broker/routing/BrokerRoutingManager.java | 47 ++++++++++++++++++++-- .../BaseLogicalTableIntegrationTest.java | 1 + ...hTwoOfflineOneRealtimeTableIntegrationTest.java | 8 +++- .../timeboundary/MinTimeBoundaryStrategy.java | 8 ++++ .../query/timeboundary/TimeBoundaryStrategy.java | 9 +++++ 7 files changed, 105 insertions(+), 19 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java index d36bfd9500..546e155cf0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java @@ -36,7 +36,9 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import org.apache.helix.HelixManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; @@ -61,6 +63,9 @@ public class PinotBrokerRouting { @Inject BrokerRoutingManager _routingManager; + @Inject + private HelixManager _helixManager; + @PUT @Produces(MediaType.TEXT_PLAIN) @Path("/routing/{tableName}") @@ -71,9 +76,13 @@ public class PinotBrokerRouting { @ApiResponse(code = 500, message = "Internal server error") }) public String buildRouting( - @ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType, + @ApiParam(value = "Table name (with type)") @PathParam("tableName") String physicalOrLogicalTableName, @Context HttpHeaders headers) { - _routingManager.buildRouting(DatabaseUtils.translateTableName(tableNameWithType, headers)); + if (ZKMetadataProvider.isLogicalTableExists(_helixManager.getHelixPropertyStore(), physicalOrLogicalTableName)) { + _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTableName); + } else { + _routingManager.buildRouting(DatabaseUtils.translateTableName(physicalOrLogicalTableName, headers)); + } return "Success"; } @@ -104,9 +113,13 @@ public class PinotBrokerRouting { @ApiResponse(code = 500, message = "Internal server error") }) public String removeRouting( - @ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType, + @ApiParam(value = "Table name (with type)") @PathParam("tableName") String physicalOrLogicalTableName, @Context HttpHeaders headers) { - _routingManager.removeRouting(DatabaseUtils.translateTableName(tableNameWithType, headers)); + if (ZKMetadataProvider.isLogicalTableExists(_helixManager.getHelixPropertyStore(), physicalOrLogicalTableName)) { + _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTableName); + } else { + _routingManager.removeRouting(DatabaseUtils.translateTableName(physicalOrLogicalTableName, headers)); + } return "Success"; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index dcf8a667e1..cc2f2f406a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -97,14 +97,19 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact @Transition(from = "ONLINE", to = "OFFLINE") public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - String tableNameWithType = message.getPartitionName(); - LOGGER.info("Processing transition from ONLINE to OFFLINE for table: {}", tableNameWithType); + String physicalOrLogicalTable = message.getPartitionName(); + LOGGER.info("Processing transition from ONLINE to OFFLINE for table: {}", physicalOrLogicalTable); try { - _routingManager.removeRouting(tableNameWithType); - _queryQuotaManager.dropTableQueryQuota(tableNameWithType); + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { + _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTable); + _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable); + } else { + _routingManager.removeRouting(physicalOrLogicalTable); + _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable); + } } catch (Exception e) { LOGGER.error("Caught exception while processing transition from ONLINE to OFFLINE for table: {}", - tableNameWithType, e); + physicalOrLogicalTable, e); throw e; } } @@ -116,14 +121,19 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact @Transition(from = "ONLINE", to = "DROPPED") public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { - String tableNameWithType = message.getPartitionName(); - LOGGER.info("Processing transition from ONLINE to DROPPED for table: {}", tableNameWithType); + String physicalOrLogicalTable = message.getPartitionName(); + LOGGER.info("Processing transition from ONLINE to DROPPED for table: {}", physicalOrLogicalTable); try { - _routingManager.removeRouting(tableNameWithType); - _queryQuotaManager.dropTableQueryQuota(tableNameWithType); + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { + _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTable); + _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable); + } else { + _routingManager.removeRouting(physicalOrLogicalTable); + _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable); + } } catch (Exception e) { LOGGER.error("Caught exception while processing transition from ONLINE to DROPPED for table: {}", - tableNameWithType, e); + physicalOrLogicalTable, e); throw e; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 801b39055b..e2c9045bef 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -66,6 +66,8 @@ import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -437,10 +439,11 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"), "Invalid time boundary strategy: %s", timeBoundaryConfig.getBoundaryStrategy()); - List<String> includedTables = - (List<String>) timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of()); + TimeBoundaryStrategy timeBoundaryStrategy = + TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(timeBoundaryConfig.getBoundaryStrategy()); + List<String> timeBoundaryTableNames = timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig); - for (String tableNameWithType : includedTables) { + for (String tableNameWithType : timeBoundaryTableNames) { Preconditions.checkArgument(TableNameBuilder.isOfflineTableResource(tableNameWithType), "Invalid table in the time boundary config: %s", tableNameWithType); try { @@ -653,6 +656,44 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle } } + public synchronized void removeRoutingForLogicalTable(String logicalTableName) { + LOGGER.info("Removing time boundary manager for logical table: {}", logicalTableName); + LogicalTableConfig logicalTableConfig = + ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + Preconditions.checkState(logicalTableConfig != null, "Failed to find logical table config for: %s", + logicalTableName); + if (!logicalTableConfig.isHybridLogicalTable()) { + LOGGER.info("Skip removing time boundary manager for non hybrid logical table: {}", logicalTableName); + return; + } + String strategy = logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy(); + TimeBoundaryStrategy timeBoundaryStrategy = + TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(strategy); + List<String> timeBoundaryTableNames = timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig); + for (String tableNameWithType : timeBoundaryTableNames) { + + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + LOGGER.info("Skipping removing time boundary manager for real-time table: {}", tableNameWithType); + continue; + } + + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + if (_routingEntryMap.containsKey(realtimeTableName)) { + LOGGER.info("Skipping removing time boundary manager for hybrid physical table: {}", rawTableName); + continue; + } + + RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType); + if (routingEntry != null) { + routingEntry.setTimeBoundaryManager(null); + LOGGER.info("Removed time boundary manager for table: {}", tableNameWithType); + } else { + LOGGER.warn("Routing does not exist for table: {}, skipping", tableNameWithType); + } + } + } + /** * Refreshes the metadata for the given segment (called when segment is getting refreshed). */ diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java index 3f2bffd888..29c6f05516 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java @@ -110,6 +110,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra if (_sharedClusterTestSuite != this) { _controllerRequestURLBuilder = _sharedClusterTestSuite._controllerRequestURLBuilder; _helixResourceManager = _sharedClusterTestSuite._helixResourceManager; + _kafkaStarters = _sharedClusterTestSuite._kafkaStarters; } _avroFiles = getAllAvroFiles(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java index d406b2ad56..ba594a303c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; @@ -62,8 +64,9 @@ public class LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig logicalTableConfig) throws IOException { - List<String> includedTables = - (List<String>) logicalTableConfig.getTimeBoundaryConfig().getParameters().get("includedTables"); + TimeBoundaryStrategy timeBoundaryStrategy = TimeBoundaryStrategyService.getInstance() + .getTimeBoundaryStrategy(logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy()); + List<String> includedTables = timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig); String timeBoundaryTableName = TableNameBuilder.extractRawTableName(includedTables.get(0)); String newTimeBoundaryTableName = timeBoundaryTableName.equals("o_1") ? "o_2" : "o_1"; @@ -71,6 +74,7 @@ public class LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B Map<String, Object> parameters = Map.of("includedTables", List.of(newTimeBoundaryTableName)); logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters); + logicalTableConfig.setQueryConfig(null); updateLogicalTableConfig(logicalTableConfig.getTableName(), logicalTableConfig); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java index 5d3c98596b..04be7d3cdf 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java @@ -36,6 +36,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; @AutoService(TimeBoundaryStrategy.class) public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy { + public static final String INCLUDED_TABLES = "includedTables"; + @Override public String getName() { return "min"; @@ -76,4 +78,10 @@ public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy { } return minTimeBoundaryInfo; } + + @Override + public List<String> getTimeBoundaryTableNames(LogicalTableConfig logicalTableConfig) { + Map<String, Object> parameters = logicalTableConfig.getTimeBoundaryConfig().getParameters(); + return parameters != null ? (List) parameters.getOrDefault(INCLUDED_TABLES, List.of()) : List.of(); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java index c1b97f28c5..7a4ee21794 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.timeboundary; +import java.util.List; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.TimeBoundaryInfo; @@ -43,4 +44,12 @@ public interface TimeBoundaryStrategy { */ TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache, RoutingManager routingManager); + + + /** + * Returns the list of physical table names that are part of the time boundary. + * @param logicalTableConfig The logical table configuration + * @return The list of physical table names that are part of the time boundary. + */ + List<String> getTimeBoundaryTableNames(LogicalTableConfig logicalTableConfig); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org