This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 16ae07661b Handle remove build routing for logical tables. (#15862)
16ae07661b is described below

commit 16ae07661b04672c7ed7e44dcbf5ca185613ef76
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Mon May 26 19:11:21 2025 +0530

    Handle remove build routing for logical tables. (#15862)
---
 .../broker/api/resources/PinotBrokerRouting.java   | 21 ++++++++--
 ...okerResourceOnlineOfflineStateModelFactory.java | 30 +++++++++-----
 .../pinot/broker/routing/BrokerRoutingManager.java | 47 ++++++++++++++++++++--
 .../BaseLogicalTableIntegrationTest.java           |  1 +
 ...hTwoOfflineOneRealtimeTableIntegrationTest.java |  8 +++-
 .../timeboundary/MinTimeBoundaryStrategy.java      |  8 ++++
 .../query/timeboundary/TimeBoundaryStrategy.java   |  9 +++++
 7 files changed, 105 insertions(+), 19 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
index d36bfd9500..546e155cf0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
@@ -36,7 +36,9 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import org.apache.helix.HelixManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
@@ -61,6 +63,9 @@ public class PinotBrokerRouting {
   @Inject
   BrokerRoutingManager _routingManager;
 
+  @Inject
+  private HelixManager _helixManager;
+
   @PUT
   @Produces(MediaType.TEXT_PLAIN)
   @Path("/routing/{tableName}")
@@ -71,9 +76,13 @@ public class PinotBrokerRouting {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public String buildRouting(
-      @ApiParam(value = "Table name (with type)") @PathParam("tableName") 
String tableNameWithType,
+      @ApiParam(value = "Table name (with type)") @PathParam("tableName") 
String physicalOrLogicalTableName,
       @Context HttpHeaders headers) {
-    
_routingManager.buildRouting(DatabaseUtils.translateTableName(tableNameWithType,
 headers));
+    if 
(ZKMetadataProvider.isLogicalTableExists(_helixManager.getHelixPropertyStore(), 
physicalOrLogicalTableName)) {
+      _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTableName);
+    } else {
+      
_routingManager.buildRouting(DatabaseUtils.translateTableName(physicalOrLogicalTableName,
 headers));
+    }
     return "Success";
   }
 
@@ -104,9 +113,13 @@ public class PinotBrokerRouting {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public String removeRouting(
-      @ApiParam(value = "Table name (with type)") @PathParam("tableName") 
String tableNameWithType,
+      @ApiParam(value = "Table name (with type)") @PathParam("tableName") 
String physicalOrLogicalTableName,
       @Context HttpHeaders headers) {
-    
_routingManager.removeRouting(DatabaseUtils.translateTableName(tableNameWithType,
 headers));
+    if 
(ZKMetadataProvider.isLogicalTableExists(_helixManager.getHelixPropertyStore(), 
physicalOrLogicalTableName)) {
+      _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTableName);
+    } else {
+      
_routingManager.removeRouting(DatabaseUtils.translateTableName(physicalOrLogicalTableName,
 headers));
+    }
     return "Success";
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index dcf8a667e1..cc2f2f406a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -97,14 +97,19 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
 
     @Transition(from = "ONLINE", to = "OFFLINE")
     public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
-      String tableNameWithType = message.getPartitionName();
-      LOGGER.info("Processing transition from ONLINE to OFFLINE for table: 
{}", tableNameWithType);
+      String physicalOrLogicalTable = message.getPartitionName();
+      LOGGER.info("Processing transition from ONLINE to OFFLINE for table: 
{}", physicalOrLogicalTable);
       try {
-        _routingManager.removeRouting(tableNameWithType);
-        _queryQuotaManager.dropTableQueryQuota(tableNameWithType);
+        if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
+          _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTable);
+          _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+        } else {
+          _routingManager.removeRouting(physicalOrLogicalTable);
+          _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+        }
       } catch (Exception e) {
         LOGGER.error("Caught exception while processing transition from ONLINE 
to OFFLINE for table: {}",
-            tableNameWithType, e);
+            physicalOrLogicalTable, e);
         throw e;
       }
     }
@@ -116,14 +121,19 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
 
     @Transition(from = "ONLINE", to = "DROPPED")
     public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
-      String tableNameWithType = message.getPartitionName();
-      LOGGER.info("Processing transition from ONLINE to DROPPED for table: 
{}", tableNameWithType);
+      String physicalOrLogicalTable = message.getPartitionName();
+      LOGGER.info("Processing transition from ONLINE to DROPPED for table: 
{}", physicalOrLogicalTable);
       try {
-        _routingManager.removeRouting(tableNameWithType);
-        _queryQuotaManager.dropTableQueryQuota(tableNameWithType);
+        if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
+          _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTable);
+          _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+        } else {
+          _routingManager.removeRouting(physicalOrLogicalTable);
+          _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+        }
       } catch (Exception e) {
         LOGGER.error("Caught exception while processing transition from ONLINE 
to DROPPED for table: {}",
-            tableNameWithType, e);
+            physicalOrLogicalTable, e);
         throw e;
       }
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 801b39055b..e2c9045bef 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -66,6 +66,8 @@ import org.apache.pinot.core.routing.TablePartitionInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -437,10 +439,11 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
     
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
         "Invalid time boundary strategy: %s", 
timeBoundaryConfig.getBoundaryStrategy());
-    List<String> includedTables =
-        (List<String>) 
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+    TimeBoundaryStrategy timeBoundaryStrategy =
+        
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(timeBoundaryConfig.getBoundaryStrategy());
+    List<String> timeBoundaryTableNames = 
timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig);
 
-    for (String tableNameWithType : includedTables) {
+    for (String tableNameWithType : timeBoundaryTableNames) {
       
Preconditions.checkArgument(TableNameBuilder.isOfflineTableResource(tableNameWithType),
           "Invalid table in the time boundary config: %s", tableNameWithType);
       try {
@@ -653,6 +656,44 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     }
   }
 
+  public synchronized void removeRoutingForLogicalTable(String 
logicalTableName) {
+    LOGGER.info("Removing time boundary manager for logical table: {}", 
logicalTableName);
+    LogicalTableConfig logicalTableConfig =
+        ZKMetadataProvider.getLogicalTableConfig(_propertyStore, 
logicalTableName);
+    Preconditions.checkState(logicalTableConfig != null, "Failed to find 
logical table config for: %s",
+        logicalTableName);
+    if (!logicalTableConfig.isHybridLogicalTable()) {
+      LOGGER.info("Skip removing time boundary manager for non hybrid logical 
table: {}", logicalTableName);
+      return;
+    }
+    String strategy = 
logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy();
+    TimeBoundaryStrategy timeBoundaryStrategy =
+        
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(strategy);
+    List<String> timeBoundaryTableNames = 
timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig);
+    for (String tableNameWithType : timeBoundaryTableNames) {
+
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+        LOGGER.info("Skipping removing time boundary manager for real-time 
table: {}", tableNameWithType);
+        continue;
+      }
+
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+      if (_routingEntryMap.containsKey(realtimeTableName)) {
+        LOGGER.info("Skipping removing time boundary manager for hybrid 
physical table: {}", rawTableName);
+        continue;
+      }
+
+      RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+      if (routingEntry != null) {
+        routingEntry.setTimeBoundaryManager(null);
+        LOGGER.info("Removed time boundary manager for table: {}", 
tableNameWithType);
+      } else {
+        LOGGER.warn("Routing does not exist for table: {}, skipping", 
tableNameWithType);
+      }
+    }
+  }
+
   /**
    * Refreshes the metadata for the given segment (called when segment is 
getting refreshed).
    */
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 3f2bffd888..29c6f05516 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -110,6 +110,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     if (_sharedClusterTestSuite != this) {
       _controllerRequestURLBuilder = 
_sharedClusterTestSuite._controllerRequestURLBuilder;
       _helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
+      _kafkaStarters = _sharedClusterTestSuite._kafkaStarters;
     }
 
     _avroFiles = getAllAvroFiles();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
index d406b2ad56..ba594a303c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.Test;
@@ -62,8 +64,9 @@ public class 
LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B
 
   private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig 
logicalTableConfig)
       throws IOException {
-    List<String> includedTables =
-        (List<String>) 
logicalTableConfig.getTimeBoundaryConfig().getParameters().get("includedTables");
+    TimeBoundaryStrategy timeBoundaryStrategy = 
TimeBoundaryStrategyService.getInstance()
+        
.getTimeBoundaryStrategy(logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy());
+    List<String> includedTables = 
timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig);
 
     String timeBoundaryTableName = 
TableNameBuilder.extractRawTableName(includedTables.get(0));
     String newTimeBoundaryTableName = timeBoundaryTableName.equals("o_1") ? 
"o_2" : "o_1";
@@ -71,6 +74,7 @@ public class 
LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B
 
     Map<String, Object> parameters = Map.of("includedTables", 
List.of(newTimeBoundaryTableName));
     logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters);
+    logicalTableConfig.setQueryConfig(null);
 
     updateLogicalTableConfig(logicalTableConfig.getTableName(), 
logicalTableConfig);
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
index 5d3c98596b..04be7d3cdf 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
@@ -36,6 +36,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 @AutoService(TimeBoundaryStrategy.class)
 public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
 
+  public static final String INCLUDED_TABLES = "includedTables";
+
   @Override
   public String getName() {
     return "min";
@@ -76,4 +78,10 @@ public class MinTimeBoundaryStrategy implements 
TimeBoundaryStrategy {
     }
     return minTimeBoundaryInfo;
   }
+
+  @Override
+  public List<String> getTimeBoundaryTableNames(LogicalTableConfig 
logicalTableConfig) {
+    Map<String, Object> parameters = 
logicalTableConfig.getTimeBoundaryConfig().getParameters();
+    return parameters != null ? (List) 
parameters.getOrDefault(INCLUDED_TABLES, List.of()) : List.of();
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
index c1b97f28c5..7a4ee21794 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.timeboundary;
 
+import java.util.List;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -43,4 +44,12 @@ public interface TimeBoundaryStrategy {
    */
   TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, 
TableCache tableCache,
       RoutingManager routingManager);
+
+
+  /**
+   * Returns the list of physical table names that are part of the time 
boundary.
+   * @param logicalTableConfig The logical table configuration
+   * @return The list of physical table names that are part of the time 
boundary.
+   */
+  List<String> getTimeBoundaryTableNames(LogicalTableConfig 
logicalTableConfig);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to