shauryachats commented on code in PR #17145: URL: https://github.com/apache/pinot/pull/17145#discussion_r2535083033
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/FederatedRoutingManager.java: ########## @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.SegmentsToQuery; +import org.apache.pinot.core.routing.TablePartitionInfo; +import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo; +import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo; +import org.apache.pinot.core.transport.ServerInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * FederatedRoutingManager manages routing across multiple ZooKeeper clusters. + * It maintains a primary BrokerRoutingManager and multiple secondary routing managers, + * each connected to their own ZooKeeper cluster. + */ +public class FederatedRoutingManager implements RoutingManager { + private static final Logger LOGGER = LoggerFactory.getLogger(FederatedRoutingManager.class); + + // Primary routing manager (connected to primary ZooKeeper cluster) + private final BrokerRoutingManager _primaryRoutingManager; + + // Secondary routing managers (connected to secondary ZooKeeper clusters) + private final List<BrokerRoutingManager> _secondaryRoutingManagers; + + // Combined routing information + private final Map<String, ServerInstance> _combinedServerInstanceMap = new ConcurrentHashMap<>(); + private final Map<String, RoutingTable> _combinedRoutingTableCache = new ConcurrentHashMap<>(); + + public FederatedRoutingManager(BrokerRoutingManager primaryRoutingManager, + List<BrokerRoutingManager> secondaryRoutingManagers) { + _primaryRoutingManager = primaryRoutingManager; + _secondaryRoutingManagers = secondaryRoutingManagers; + } + + private void updateCombinedServerInstances() { + _combinedServerInstanceMap.clear(); + + // Add server instances from primary routing manager + Map<String, ServerInstance> primaryServerInstances = _primaryRoutingManager.getEnabledServerInstanceMap(); + _combinedServerInstanceMap.putAll(primaryServerInstances); + + // Add server instances from secondary routing managers + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + Map<String, ServerInstance> secondaryServerInstances = secondaryRoutingManager.getEnabledServerInstanceMap(); + _combinedServerInstanceMap.putAll(secondaryServerInstances); + } + + LOGGER.info("Updated combined server instances. Total: {}", _combinedServerInstanceMap.size()); + } + + @Override + public boolean routingExists(String tableNameWithType) { + // Check if routing exists in primary routing manager + if (_primaryRoutingManager.routingExists(tableNameWithType)) { + return true; + } + + // Check if routing exists in any secondary routing manager + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + if (secondaryRoutingManager.routingExists(tableNameWithType)) { + return true; + } + } + + return false; + } + + @Override + public boolean isTableDisabled(String tableNameWithType) { + // Check if table is disabled in primary routing manager + if (_primaryRoutingManager.isTableDisabled(tableNameWithType)) { + return true; + } + + // Check if table is disabled in any secondary routing manager + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + if (secondaryRoutingManager.isTableDisabled(tableNameWithType)) { + return true; + } + } + + return false; + } + + @Nullable + @Override + public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) { + String tableNameWithType = brokerRequest.getQuerySource().getTableName(); + return getRoutingTable(brokerRequest, tableNameWithType, requestId); + } + + @Nullable + @Override + public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String tableNameWithType, long requestId) { + // Check cache first + String cacheKey = tableNameWithType + "_" + requestId; + RoutingTable cachedRoutingTable = _combinedRoutingTableCache.get(cacheKey); + if (cachedRoutingTable != null) { + return cachedRoutingTable; + } + + // Get routing table from primary routing manager + RoutingTable primaryRoutingTable = _primaryRoutingManager.getRoutingTable(brokerRequest, + tableNameWithType, requestId); + + // Combine with routing tables from secondary routing managers + RoutingTable combinedRoutingTable = combineRoutingTables(primaryRoutingTable, tableNameWithType, + brokerRequest, requestId); + + // Cache the result + if (combinedRoutingTable != null) { + _combinedRoutingTableCache.put(cacheKey, combinedRoutingTable); + } + + return combinedRoutingTable; + } + + private RoutingTable combineRoutingTables(RoutingTable primaryRoutingTable, String tableNameWithType, + BrokerRequest brokerRequest, long requestId) { + if (primaryRoutingTable == null) { + // If primary routing table is null, try to get routing table from secondary managers + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + RoutingTable secondaryRoutingTable = secondaryRoutingManager.getRoutingTable(brokerRequest, + tableNameWithType, requestId); + if (secondaryRoutingTable != null) { + return secondaryRoutingTable; + } + } + return null; + } + + // Start with primary routing table + Map<ServerInstance, SegmentsToQuery> combinedServerInstanceToSegmentsMap = + new HashMap<>(primaryRoutingTable.getServerInstanceToSegmentsMap()); + List<String> combinedUnavailableSegments = new ArrayList<>(primaryRoutingTable.getUnavailableSegments()); + int combinedNumPrunedSegments = primaryRoutingTable.getNumPrunedSegments(); + + // Add routing information from secondary routing managers + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + try { + RoutingTable secondaryRoutingTable = secondaryRoutingManager.getRoutingTable(brokerRequest, + tableNameWithType, requestId); + if (secondaryRoutingTable != null) { + // Combine server instance to segments map + for (Map.Entry<ServerInstance, SegmentsToQuery> entry + : secondaryRoutingTable.getServerInstanceToSegmentsMap().entrySet()) { + ServerInstance serverInstance = entry.getKey(); + SegmentsToQuery secondaryRouteInfo = entry.getValue(); + + SegmentsToQuery existingRouteInfo = combinedServerInstanceToSegmentsMap.get(serverInstance); + if (existingRouteInfo != null) { + // Merge segments + existingRouteInfo.getSegments().addAll(secondaryRouteInfo.getSegments()); + existingRouteInfo.getOptionalSegments().addAll(secondaryRouteInfo.getOptionalSegments()); + } else { + // Add new server instance + combinedServerInstanceToSegmentsMap.put(serverInstance, secondaryRouteInfo); + } + } + + // Combine unavailable segments + combinedUnavailableSegments.addAll(secondaryRoutingTable.getUnavailableSegments()); + + // Add pruned segments count + combinedNumPrunedSegments += secondaryRoutingTable.getNumPrunedSegments(); + } + } catch (Exception e) { + LOGGER.error("Error combining routing table from secondary routing manager for table {}", + tableNameWithType, e); + } + } + + return new RoutingTable(combinedServerInstanceToSegmentsMap, + combinedUnavailableSegments, combinedNumPrunedSegments); + } + + @Nullable + @Override + public TimeBoundaryInfo getTimeBoundaryInfo(String tableNameWithType) { + // Try to get time boundary info from primary routing manager first + TimeBoundaryInfo primaryTimeBoundaryInfo = _primaryRoutingManager.getTimeBoundaryInfo(tableNameWithType); + if (primaryTimeBoundaryInfo != null) { + return primaryTimeBoundaryInfo; + } + + // Try to get time boundary info from secondary routing managers + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + try { + TimeBoundaryInfo secondaryTimeBoundaryInfo = secondaryRoutingManager.getTimeBoundaryInfo(tableNameWithType); + if (secondaryTimeBoundaryInfo != null) { + return secondaryTimeBoundaryInfo; + } + } catch (Exception e) { + LOGGER.error("Error getting time boundary info from secondary routing manager for table {}", + tableNameWithType, e); + } + } + + return null; + } + + @Override + public Map<String, ServerInstance> getEnabledServerInstanceMap() { + updateCombinedServerInstances(); + return _combinedServerInstanceMap; + } + + @Override + public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) { + // Try to get table partition info from primary routing manager first + TablePartitionInfo primaryTablePartitionInfo = _primaryRoutingManager.getTablePartitionInfo(tableNameWithType); + if (primaryTablePartitionInfo != null) { + return primaryTablePartitionInfo; + } + + // Try to get table partition info from secondary routing managers + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + try { + TablePartitionInfo secondaryTablePartitionInfo = + secondaryRoutingManager.getTablePartitionInfo(tableNameWithType); + if (secondaryTablePartitionInfo != null) { + return secondaryTablePartitionInfo; + } + } catch (Exception e) { + LOGGER.error("Error getting table partition info from secondary routing manager for table {}", + tableNameWithType, e); + } + } + + return null; + } + + @Override + public Set<String> getServingInstances(String tableNameWithType) { + // Get serving instances from primary routing manager + Set<String> primaryServingInstances = _primaryRoutingManager.getServingInstances(tableNameWithType); + if (primaryServingInstances == null) { + primaryServingInstances = Collections.emptySet(); + } + // Combine with serving instances from secondary routing managers + Set<String> combinedServingInstances = new HashSet<>(primaryServingInstances); + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + try { + Set<String> secondaryServingInstances = secondaryRoutingManager.getServingInstances(tableNameWithType); + if (secondaryServingInstances != null) { + combinedServingInstances.addAll(secondaryServingInstances); + } + } catch (Exception e) { + LOGGER.error("Error getting serving instances from secondary routing manager for table {}", + tableNameWithType, e); + } + } + + return combinedServingInstances.isEmpty() ? null : combinedServingInstances; + } + + @Override + public List<String> getSegments(BrokerRequest brokerRequest) { + // Get segments from primary routing manager + List<String> primarySegments = _primaryRoutingManager.getSegments(brokerRequest); + + // Combine with segments from secondary routing managers + List<String> combinedSegments = new ArrayList<>(primarySegments); + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + try { + List<String> secondarySegments = secondaryRoutingManager.getSegments(brokerRequest); + combinedSegments.addAll(secondarySegments); + } catch (Exception e) { + LOGGER.error("Error getting segments from secondary routing manager", e); + } + } + + return combinedSegments; + } + + @Override + public TablePartitionReplicatedServersInfo getTablePartitionReplicatedServersInfo(String tableNameWithType) { + // Try to get table partition replicated servers info from primary routing manager first + TablePartitionReplicatedServersInfo primaryInfo = + _primaryRoutingManager.getTablePartitionReplicatedServersInfo(tableNameWithType); + if (primaryInfo != null) { + return primaryInfo; + } + + // Try to get table partition replicated servers info from secondary routing managers + for (BrokerRoutingManager secondaryRoutingManager : _secondaryRoutingManagers) { + try { + TablePartitionReplicatedServersInfo secondaryInfo = + secondaryRoutingManager.getTablePartitionReplicatedServersInfo(tableNameWithType); + if (secondaryInfo != null) { + return secondaryInfo; + } + } catch (Exception e) { + LOGGER.error( + "Error getting table partition replicated servers info from secondary routing manager for table {}", + tableNameWithType, e); + } + } + + return null; + } + + public RoutingManager getPrimaryRoutingManager(Map<String, String> queryOptions) { Review Comment: Good point. Instead of passing a RoutingManager for the RequestHandler methods, I could instead pass a RoutingManagerProvider and determine which routingManager to use at runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
