This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch use_server_instance in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit eec13d444a3f00e24c3e81bfeef4fbba55019e49 Author: kishoreg <[email protected]> AuthorDate: Tue Nov 5 22:32:39 2019 -0800 Enhance RoutingTable from Map<String, List<String>> to Map<ServerInstance, List<String>> --- .../requesthandler/BaseBrokerRequestHandler.java | 9 ++-- .../ConnectionPoolBrokerRequestHandler.java | 12 ++--- .../SingleConnectionBrokerRequestHandler.java | 4 +- .../routing/HelixExternalViewBasedRouting.java | 7 +-- .../apache/pinot/broker/routing/RoutingTable.java | 3 +- .../builder/BalancedRandomRoutingTableBuilder.java | 11 +++-- .../BasePartitionAwareRoutingTableBuilder.java | 23 ++++----- .../routing/builder/BaseRoutingTableBuilder.java | 43 +++++++++-------- .../builder/DefaultOfflineRoutingTableBuilder.java | 5 +- .../DefaultRealtimeRoutingTableBuilder.java | 5 +- .../builder/GeneratorBasedRoutingTableBuilder.java | 56 +++++++++++----------- .../HighLevelConsumerBasedRoutingTableBuilder.java | 29 +++++------ .../LowLevelConsumerRoutingTableBuilder.java | 11 +++-- .../PartitionAwareOfflineRoutingTableBuilder.java | 9 ++-- .../PartitionAwareRealtimeRoutingTableBuilder.java | 11 +++-- .../routing/builder/RoutingTableBuilder.java | 5 +- .../broker/broker/HelixBrokerStarterTest.java | 3 +- .../broker/routing/RandomRoutingTableTest.java | 3 +- .../pinot/broker/routing/RoutingTableTest.java | 5 +- .../BalancedRandomRoutingTableBuilderTest.java | 11 +++-- .../HighLevelConsumerRoutingTableBuilderTest.java | 3 +- .../LargeClusterRoutingTableBuilderTest.java | 36 +++++++------- .../LowLevelConsumerRoutingTableBuilderTest.java | 17 +++---- ...rtitionAwareOfflineRoutingTableBuilderTest.java | 11 +++-- ...titionAwareRealtimeRoutingTableBuilderTest.java | 9 ++-- .../pinot/common/response/ServerInstance.java | 5 ++ .../apache/pinot/core/transport/QueryRouter.java | 15 +++--- .../org/apache/pinot/core/transport/Server.java | 6 +++ .../pinot/core/transport/QueryRouterTest.java | 5 +- .../transport/config/PerTableRoutingConfig.java | 7 +-- .../transport/scattergather/ScatterGatherImpl.java | 6 +-- .../scattergather/ScatterGatherRequest.java | 3 +- .../transport/perf/ScatterGatherPerfClient.java | 4 +- .../transport/scattergather/ScatterGatherTest.java | 18 +++---- 34 files changed, 226 insertions(+), 184 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 834148d..ac3382e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -53,6 +53,7 @@ import org.apache.pinot.common.request.FilterOperator; import org.apache.pinot.common.request.FilterQuery; import org.apache.pinot.common.request.FilterQueryMap; import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Broker; @@ -261,8 +262,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Calculate routing table for the query long routingStartTimeNs = System.nanoTime(); - Map<String, List<String>> offlineRoutingTable = null; - Map<String, List<String>> realtimeRoutingTable = null; + Map<ServerInstance, List<String>> offlineRoutingTable = null; + Map<ServerInstance, List<String>> realtimeRoutingTable = null; if (offlineBrokerRequest != null) { offlineRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(offlineBrokerRequest)); if (offlineRoutingTable.isEmpty()) { @@ -577,8 +578,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * Processes the optimized broker requests for both OFFLINE and REALTIME table. */ protected abstract BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) throws Exception; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java index 60a4312..aeaa44e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java @@ -131,8 +131,8 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler @Override protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) throws Exception { ScatterGatherStats scatterGatherStats = new ScatterGatherStats(); @@ -238,7 +238,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler * @return composite future used to gather responses. */ private CompositeFuture<byte[]> scatterBrokerRequest(long requestId, BrokerRequest brokerRequest, - Map<String, List<String>> routingTable, boolean isOfflineTable, long timeoutMs, + Map<ServerInstance, List<String>> routingTable, boolean isOfflineTable, long timeoutMs, ScatterGatherStats scatterGatherStats, PhaseTimes phaseTimes) throws InterruptedException { long scatterStartTimeNs = System.nanoTime(); @@ -355,12 +355,12 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler private static class ScatterGatherRequestImpl implements ScatterGatherRequest { private final BrokerRequest _brokerRequest; - private final Map<String, List<String>> _routingTable; + private final Map<ServerInstance, List<String>> _routingTable; private final long _requestId; private final long _requestTimeoutMs; private final String _brokerId; - public ScatterGatherRequestImpl(BrokerRequest request, Map<String, List<String>> routingTable, long requestId, + public ScatterGatherRequestImpl(BrokerRequest request, Map<ServerInstance, List<String>> routingTable, long requestId, long requestTimeoutMs, String brokerId) { _brokerRequest = request; _routingTable = routingTable; @@ -370,7 +370,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler } @Override - public Map<String, List<String>> getRoutingTable() { + public Map<ServerInstance, List<String>> getRoutingTable() { return _routingTable; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index d7a4890..0674af2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -71,8 +71,8 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl @Override protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) throws Exception { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java index 63ab545..857f44c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java @@ -50,6 +50,7 @@ import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.BrokerTimer; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.EqualityUtils; import org.apache.pinot.common.utils.JsonUtils; @@ -112,7 +113,7 @@ public class HelixExternalViewBasedRouting implements ClusterChangeHandler, Rout } @Override - public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) { + public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request) { String tableName = request.getTableName(); RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableName); return routingTableBuilder.getRoutingTable(request, _segmentSelectorMap.get(tableName)); @@ -609,8 +610,8 @@ public class HelixExternalViewBasedRouting implements ClusterChangeHandler, Rout ArrayNode entries = JsonUtils.newArrayNode(); RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(currentTable); - List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); - for (Map<String, List<String>> routingTable : routingTables) { + List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); + for (Map<ServerInstance, List<String>> routingTable : routingTables) { entries.add(JsonUtils.objectToJsonNode(routingTable)); } tableEntry.set("routingTableEntries", entries); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java index 5cad348..18717ff 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java @@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing; import java.util.List; import java.util.Map; +import org.apache.pinot.common.response.ServerInstance; /** @@ -33,7 +34,7 @@ public interface RoutingTable { * @param request Routing table lookup request * @return Map from server to list of segments */ - Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request); + Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request); /** * Return whether the routing table for the given table exists. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java index 3fb1e2d..761be58 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java @@ -28,6 +28,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; /** @@ -46,15 +47,15 @@ public class BalancedRandomRoutingTableBuilder extends BaseRoutingTableBuilder { _numRoutingTables = configuration.getInt(NUM_ROUTING_TABLES_KEY, DEFAULT_NUM_ROUTING_TABLES); } - protected List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap( - Map<String, List<String>> segmentToServersMap) { - List<Map<String, List<String>>> routingTables = new ArrayList<>(_numRoutingTables); + protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap( + Map<String, List<ServerInstance>> segmentToServersMap) { + List<Map<ServerInstance, List<String>>> routingTables = new ArrayList<>(_numRoutingTables); Set<String> segmentsToQuery = segmentToServersMap.keySet(); for (int i = 0; i < _numRoutingTables; i++) { - Map<String, List<String>> routingTable = new HashMap<>(); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(); for (String segmentName : segmentsToQuery) { - List<String> servers = segmentToServersMap.get(segmentName); + List<ServerInstance> servers = segmentToServersMap.get(segmentName); routingTable.get(getServerWithLeastSegmentsAssigned(servers, routingTable)).add(segmentName); } routingTables.add(routingTable); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java index 7b8b182..4d4f465 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java @@ -37,6 +37,7 @@ import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa // Map from segment name to map from replica id to server // Set variable as volatile so all threads can get the up-to-date map - protected volatile Map<String, Map<Integer, String>> _segmentToReplicaToServerMap; + protected volatile Map<String, Map<Integer, ServerInstance>> _segmentToReplicaToServerMap; // Cache for segment zk metadata to reduce the lookup to ZK store protected Map<String, SegmentZKMetadata> _segmentToZkMetadataMapping = new ConcurrentHashMap<>(); @@ -87,9 +88,9 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa } @Override - public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { + public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { // Copy the reference for the current segment to replica to server mapping for snapshot - Map<String, Map<Integer, String>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap; + Map<String, Map<Integer, ServerInstance>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap; // Get all available segments for table Set<String> segmentsToQuery = segmentToReplicaToServerMap.keySet(); @@ -99,7 +100,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery); } - Map<String, List<String>> routingTable = new HashMap<>(); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(); SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest()); // Shuffle the replica group ids in order to satisfy: @@ -119,19 +120,19 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa if (!segmentPruned) { // 2b. Segment cannot be pruned. Assign the segment to a server based on the shuffled replica group ids - Map<Integer, String> replicaIdToServerMap = segmentToReplicaToServerMap.get(segmentName); + Map<Integer, ServerInstance> replicaIdToServerMap = segmentToReplicaToServerMap.get(segmentName); - String serverName = null; + ServerInstance serverInstance = null; for (int i = 0; i < _numReplicas; i++) { - serverName = replicaIdToServerMap.get(shuffledReplicaGroupIds[i]); + serverInstance = replicaIdToServerMap.get(shuffledReplicaGroupIds[i]); // If a server is found, update routing table for the current segment - if (serverName != null) { + if (serverInstance != null) { break; } } - if (serverName != null) { - routingTable.computeIfAbsent(serverName, k -> new ArrayList<>()).add(segmentName); + if (serverInstance != null) { + routingTable.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(segmentName); } else { // No server is found for this segment if the code reach here @@ -145,7 +146,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa } @Override - public List<Map<String, List<String>>> getRoutingTables() { + public List<Map<ServerInstance, List<String>>> getRoutingTables() { throw new UnsupportedOperationException("Partition aware routing table cannot be pre-computed"); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java index 7c63d43..7418f64 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java @@ -36,6 +36,7 @@ import org.apache.pinot.common.config.RoutingConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,10 +55,10 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { // Set variable as volatile so all threads can get the up-to-date routing tables // Routing tables are used for storing pre-computed routing table - protected volatile List<Map<String, List<String>>> _routingTables; + protected volatile List<Map<ServerInstance, List<String>>> _routingTables; // A mapping of segments to servers is used for dynamic routing table building process - protected volatile Map<String, List<String>> _segmentToServersMap; + protected volatile Map<String, List<ServerInstance>> _segmentToServersMap; @Override public void init(Configuration configuration, TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore, @@ -76,13 +77,13 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { } } - protected static String getServerWithLeastSegmentsAssigned(List<String> servers, - Map<String, List<String>> routingTable) { + protected static ServerInstance getServerWithLeastSegmentsAssigned(List<ServerInstance> servers, + Map<ServerInstance, List<String>> routingTable) { Collections.shuffle(servers); - String selectedServer = null; + ServerInstance selectedServer = null; int minNumSegmentsAssigned = Integer.MAX_VALUE; - for (String server : servers) { + for (ServerInstance server : servers) { List<String> segments = routingTable.get(server); if (segments == null) { routingTable.put(server, new ArrayList<>()); @@ -114,7 +115,7 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { @Override public void computeOnExternalViewChange(String tableName, ExternalView externalView, List<InstanceConfig> instanceConfigs) { - Map<String, List<String>> segmentToServersMap = + Map<String, List<ServerInstance>> segmentToServersMap = computeSegmentToServersMapFromExternalView(externalView, instanceConfigs); if (_enableDynamicComputing) { @@ -122,15 +123,15 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { _segmentToServersMap = segmentToServersMap; } else { // Otherwise, we cache the pre-computed routing tables - List<Map<String, List<String>>> routingTables = computeRoutingTablesFromSegmentToServersMap(segmentToServersMap); + List<Map<ServerInstance, List<String>>> routingTables = computeRoutingTablesFromSegmentToServersMap(segmentToServersMap); _routingTables = routingTables; } } - public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { + public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { if (_enableDynamicComputing) { // Copy the pointer for snapshot since the pointer for segment to servers map can change at anytime - Map<String, List<String>> segmentToServersMap = _segmentToServersMap; + Map<String, List<ServerInstance>> segmentToServersMap = _segmentToServersMap; // Selecting segments only required for processing a query Set<String> segmentsToQuery = segmentToServersMap.keySet(); @@ -147,7 +148,7 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { } @Override - public List<Map<String, List<String>>> getRoutingTables() { + public List<Map<ServerInstance, List<String>>> getRoutingTables() { return _routingTables; } @@ -158,12 +159,12 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { * @param segmentsToQuery a list of segments that need to be processed for a particular query * @return a routing table */ - public Map<String, List<String>> computeDynamicRoutingTable(Map<String, List<String>> segmentToServersMap, + public Map<ServerInstance, List<String>> computeDynamicRoutingTable(Map<String, List<ServerInstance>> segmentToServersMap, Set<String> segmentsToQuery) { - Map<String, List<String>> routingTable = new HashMap<>(); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(); for (String segmentName : segmentsToQuery) { - List<String> servers = segmentToServersMap.get(segmentName); - String selectedServer = servers.get(_random.nextInt(servers.size())); + List<ServerInstance> servers = segmentToServersMap.get(segmentName); + ServerInstance selectedServer = servers.get(_random.nextInt(servers.size())); List<String> segments = routingTable.computeIfAbsent(selectedServer, k -> new ArrayList<>()); segments.add(segmentName); } @@ -178,18 +179,18 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { * @param instanceConfigs a list of instance config * @return a mapping of segment to servers */ - protected Map<String, List<String>> computeSegmentToServersMapFromExternalView(ExternalView externalView, + protected Map<String, List<ServerInstance>> computeSegmentToServersMapFromExternalView(ExternalView externalView, List<InstanceConfig> instanceConfigs) { - Map<String, List<String>> segmentToServersMap = new HashMap<>(); + Map<String, List<ServerInstance>> segmentToServersMap = new HashMap<>(); RoutingTableInstancePruner instancePruner = new RoutingTableInstancePruner(instanceConfigs); for (String segmentName : externalView.getPartitionSet()) { // List of servers that are active and are serving the segment - List<String> servers = new ArrayList<>(); + List<ServerInstance> servers = new ArrayList<>(); for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) { String serverName = entry.getKey(); if (entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE) && !instancePruner.isInactive(serverName)) { - servers.add(serverName); + servers.add(ServerInstance.forInstanceName(serverName)); } } if (!servers.isEmpty()) { @@ -208,6 +209,6 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder { * @param segmentToServersMap a mapping of segment to servers * @return a list of final routing tables */ - protected abstract List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap( - Map<String, List<String>> segmentToServersMap); + protected abstract List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap( + Map<String, List<ServerInstance>> segmentToServersMap); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java index 8bdb9df..7be412b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java @@ -31,6 +31,7 @@ import org.apache.pinot.broker.routing.RoutingTableLookupRequest; import org.apache.pinot.broker.routing.selector.SegmentSelector; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,12 +127,12 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder { } @Override - public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { + public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { return _routingTableBuilder.getRoutingTable(request, segmentSelector); } @Override - public List<Map<String, List<String>>> getRoutingTables() { + public List<Map<ServerInstance, List<String>>> getRoutingTables() { return _routingTableBuilder.getRoutingTables(); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java index 49ffc18..2561cf1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java @@ -31,6 +31,7 @@ import org.apache.pinot.broker.routing.RoutingTableLookupRequest; import org.apache.pinot.broker.routing.selector.SegmentSelector; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.SegmentName; @@ -73,7 +74,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder { } @Override - public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { + public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) { boolean forceLLC = false; boolean forceHLC = false; for (String routingOption : request.getRoutingOptions()) { @@ -105,7 +106,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder { } @Override - public List<Map<String, List<String>>> getRoutingTables() { + public List<Map<ServerInstance, List<String>>> getRoutingTables() { if (_hasLLC) { return _realtimeLLCRoutingTableBuilder.getRoutingTables(); } else if (_hasHLC) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java index 592bd6e..1dcf8f9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java @@ -28,6 +28,7 @@ import java.util.PriorityQueue; import java.util.Set; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.response.ServerInstance; /** @@ -47,10 +48,11 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable * Generates a routing table, decorated with a metric. * * @return A pair of a routing table and its associated metric. + * @param segmentToServersMap */ - private Pair<Map<String, List<String>>, Float> generateRoutingTableWithMetric( - Map<String, List<String>> segmentToServersMap) { - Map<String, List<String>> routingTable = generateRoutingTable(segmentToServersMap); + private Pair<Map<ServerInstance, List<String>>, Float> generateRoutingTableWithMetric( + Map<String, List<ServerInstance>> segmentToServersMap) { + Map<ServerInstance, List<String>> routingTable = generateRoutingTable(segmentToServersMap); int segmentCount = 0; int serverCount = 0; @@ -73,26 +75,26 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable return new ImmutablePair<>(routingTable, variance); } - private Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) { + private Map<ServerInstance, List<String>> generateRoutingTable(Map<String, List<ServerInstance>> segmentToServersMap) { - Map<String, List<String>> routingTable = new HashMap<>(); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(); if (segmentToServersMap.isEmpty()) { return routingTable; } // Construct the map from server to list of segments - Map<String, List<String>> serverToSegmentsMap = new HashMap<>(); - for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) { - List<String> servers = entry.getValue(); - for (String serverName : servers) { - List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverName, k -> new ArrayList<>()); + Map<ServerInstance, List<String>> serverToSegmentsMap = new HashMap<>(); + for (Map.Entry<String, List<ServerInstance>> entry : segmentToServersMap.entrySet()) { + List<ServerInstance> servers = entry.getValue(); + for (ServerInstance serverInstance : servers) { + List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverInstance, k -> new ArrayList<>()); segmentsForServer.add(entry.getKey()); } } int numSegments = segmentToServersMap.size(); - List<String> servers = new ArrayList<>(serverToSegmentsMap.keySet()); + List<ServerInstance> servers = new ArrayList<>(serverToSegmentsMap.keySet()); int numServers = servers.size(); // Set of segments that have no instance serving them @@ -100,7 +102,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable // Set of servers in this routing table int targetNumServersPerQuery = getTargetNumServersPerQuery(); - Set<String> serversInRoutingTable = new HashSet<>(targetNumServersPerQuery); + Set<ServerInstance> serversInRoutingTable = new HashSet<>(targetNumServersPerQuery); if (numServers <= targetNumServersPerQuery) { // If there are not enough instances, add them all @@ -109,7 +111,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable } else { // Otherwise add _targetNumServersPerQuery instances while (serversInRoutingTable.size() < targetNumServersPerQuery) { - String randomServer = servers.get(_random.nextInt(numServers)); + ServerInstance randomServer = servers.get(_random.nextInt(numServers)); if (!serversInRoutingTable.contains(randomServer)) { serversInRoutingTable.add(randomServer); segmentsNotHandledByServers.removeAll(serverToSegmentsMap.get(randomServer)); @@ -122,32 +124,32 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable String segmentNotHandledByServers = segmentsNotHandledByServers.iterator().next(); // Pick a random server that can serve this segment - List<String> serversForSegment = segmentToServersMap.get(segmentNotHandledByServers); - String randomServer = serversForSegment.get(_random.nextInt(serversForSegment.size())); + List<ServerInstance> serversForSegment = segmentToServersMap.get(segmentNotHandledByServers); + ServerInstance randomServer = serversForSegment.get(_random.nextInt(serversForSegment.size())); serversInRoutingTable.add(randomServer); segmentsNotHandledByServers.removeAll(serverToSegmentsMap.get(randomServer)); } // Sort all the segments to be used during assignment in ascending order of replicas - PriorityQueue<Pair<String, List<String>>> segmentToReplicaSetQueue = + PriorityQueue<Pair<String, List<ServerInstance>>> segmentToReplicaSetQueue = new PriorityQueue<>(numSegments, Comparator.comparingInt(pair -> pair.getRight().size())); - for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) { + for (Map.Entry<String, List<ServerInstance>> entry : segmentToServersMap.entrySet()) { // Servers for the segment is the intersection of all servers for this segment and the servers that we have in // this routing table - List<String> serversForSegment = new ArrayList<>(entry.getValue()); + List<ServerInstance> serversForSegment = new ArrayList<>(entry.getValue()); serversForSegment.retainAll(serversInRoutingTable); segmentToReplicaSetQueue.add(new ImmutablePair<>(entry.getKey(), serversForSegment)); } // Assign each segment to a server - Pair<String, List<String>> segmentServersPair; + Pair<String, List<ServerInstance>> segmentServersPair; while ((segmentServersPair = segmentToReplicaSetQueue.poll()) != null) { String segmentName = segmentServersPair.getLeft(); - List<String> serversForSegment = segmentServersPair.getRight(); + List<ServerInstance> serversForSegment = segmentServersPair.getRight(); - String serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable); + ServerInstance serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable); List<String> segmentsAssignedToServer = routingTable.computeIfAbsent(serverWithLeastSegmentsAssigned, k -> new ArrayList<>()); segmentsAssignedToServer.add(segmentName); @@ -220,8 +222,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable */ @Override - protected List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap( - Map<String, List<String>> segmentToServersMap) { + protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap( + Map<String, List<ServerInstance>> segmentToServersMap) { // The default routing table algorithm tries to balance all available segments across all servers, so that each // server is hit on every query. This works fine with small clusters (say less than 20 servers) but for larger // clusters, this adds up to significant overhead (one request must be enqueued for each server, processed, @@ -260,7 +262,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable // in workload per server across all the routing tables. To do so, we generate an initial set of routing tables // according to a per-routing table metric and discard the worst routing tables. - PriorityQueue<Pair<Map<String, List<String>>, Float>> topRoutingTables = + PriorityQueue<Pair<Map<ServerInstance, List<String>>, Float>> topRoutingTables = new PriorityQueue<>(ROUTING_TABLE_COUNT, (left, right) -> { // Float.compare sorts in ascending order and we want a max heap, so we need to return the negative // of the comparison @@ -273,8 +275,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable // Generate routing more tables and keep the ROUTING_TABLE_COUNT top ones for (int i = 0; i < (ROUTING_TABLE_GENERATION_COUNT - ROUTING_TABLE_COUNT); ++i) { - Pair<Map<String, List<String>>, Float> newRoutingTable = generateRoutingTableWithMetric(segmentToServersMap); - Pair<Map<String, List<String>>, Float> worstRoutingTable = topRoutingTables.peek(); + Pair<Map<ServerInstance, List<String>>, Float> newRoutingTable = generateRoutingTableWithMetric(segmentToServersMap); + Pair<Map<ServerInstance, List<String>>, Float> worstRoutingTable = topRoutingTables.peek(); // If the new routing table is better than the worst one, keep it if (newRoutingTable.getRight() < worstRoutingTable.getRight()) { @@ -284,7 +286,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable } // Return the best routing tables - List<Map<String, List<String>>> routingTables = new ArrayList<>(topRoutingTables.size()); + List<Map<ServerInstance, List<String>>> routingTables = new ArrayList<>(topRoutingTables.size()); while (!topRoutingTables.isEmpty()) { routingTables.add(topRoutingTables.poll().getKey()); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java index e1368e6..1aa3908 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.HLCSegmentName; import org.apache.pinot.common.utils.SegmentName; @@ -33,17 +34,17 @@ import org.apache.pinot.common.utils.SegmentName; public class HighLevelConsumerBasedRoutingTableBuilder extends BaseRoutingTableBuilder { @Override - protected Map<String, List<String>> computeSegmentToServersMapFromExternalView(ExternalView externalView, + protected Map<String, List<ServerInstance>> computeSegmentToServersMapFromExternalView(ExternalView externalView, List<InstanceConfig> instanceConfigs) { - Map<String, List<String>> segmentToServersMap = new HashMap<>(); + Map<String, List<ServerInstance>> segmentToServersMap = new HashMap<>(); RoutingTableInstancePruner instancePruner = new RoutingTableInstancePruner(instanceConfigs); for (String segmentName : externalView.getPartitionSet()) { - List<String> servers = new ArrayList<>(); + List<ServerInstance> servers = new ArrayList<>(); for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) { String serverName = entry.getKey(); if (entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE) && !instancePruner.isInactive(serverName) && SegmentName.isHighLevelConsumerSegmentName(segmentName)) { - servers.add(serverName); + servers.add(ServerInstance.forInstanceName(serverName)); } } if (servers.size() != 0) { @@ -56,20 +57,20 @@ public class HighLevelConsumerBasedRoutingTableBuilder extends BaseRoutingTableB } @Override - protected List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap( - Map<String, List<String>> segmentsToServerMap) { - List<Map<String, List<String>>> routingTables = new ArrayList<>(); - Map<String, Map<String, List<String>>> groupIdToRouting = new HashMap<>(); - for (Map.Entry<String, List<String>> entry : segmentsToServerMap.entrySet()) { + protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap( + Map<String, List<ServerInstance>> segmentsToServerMap) { + List<Map<ServerInstance, List<String>>> routingTables = new ArrayList<>(); + Map<String, Map<ServerInstance, List<String>>> groupIdToRouting = new HashMap<>(); + for (Map.Entry<String, List<ServerInstance>> entry : segmentsToServerMap.entrySet()) { String segmentName = entry.getKey(); HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName); String groupId = hlcSegmentName.getGroupId(); - Map<String, List<String>> routingTableForGroupId = + Map<ServerInstance, List<String>> routingTableForGroupId = groupIdToRouting.computeIfAbsent(groupId, k -> new HashMap<>()); - List<String> servers = entry.getValue(); - for (String serverName : servers) { - List<String> segmentsForServer = routingTableForGroupId.computeIfAbsent(serverName, k -> new ArrayList<>()); + List<ServerInstance> servers = entry.getValue(); + for (ServerInstance serverInstance : servers) { + List<String> segmentsForServer = routingTableForGroupId.computeIfAbsent(serverInstance, k -> new ArrayList<>()); segmentsForServer.add(segmentName); } } @@ -78,7 +79,7 @@ public class HighLevelConsumerBasedRoutingTableBuilder extends BaseRoutingTableB } @Override - public Map<String, List<String>> computeDynamicRoutingTable(Map<String, List<String>> segmentToServersMap, + public Map<ServerInstance, List<String>> computeDynamicRoutingTable(Map<String, List<ServerInstance>> segmentToServersMap, Set<String> segmentsToQuery) { throw new UnsupportedOperationException( "Dynamic routing table computation for high level consumer base routing is not supported"); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java index 4a39da2..4749e60 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java @@ -30,6 +30,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCUtils; import org.apache.pinot.common.utils.SegmentName; @@ -66,7 +67,7 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa } @Override - protected Map<String, List<String>> computeSegmentToServersMapFromExternalView(ExternalView externalView, + protected Map<String, List<ServerInstance>> computeSegmentToServersMapFromExternalView(ExternalView externalView, List<InstanceConfig> instanceConfigs) { // We build the segment to servers mapping here. What we want to do is to make sure that we uphold // the guarantees clients expect (no duplicate records, eventual consistency) and spreading the load as equally as @@ -80,7 +81,7 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa // The upstream code in BaseRoutingTableGenerator will generate routing tables based on taking a subset of servers // if the cluster is large enough as well as ensure that the best routing tables are used for routing. - Map<String, List<String>> segmentToServersMap = new HashMap<>(); + Map<String, List<ServerInstance>> segmentToServersMap = new HashMap<>(); // 1. Gather all segments and group them by partition, sorted by sequence number Map<String, SortedSet<SegmentName>> sortedSegmentsByStreamPartition = @@ -103,7 +104,7 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa SegmentName validConsumingSegment = allowedSegmentInConsumingStateByPartition.get(partitionId); for (SegmentName segmentName : segmentNames) { - List<String> validServers = new ArrayList<>(); + List<ServerInstance> validServers = new ArrayList<>(); String segmentNameStr = segmentName.getSegmentName(); Map<String, String> externalViewState = externalView.getStateMap(segmentNameStr); @@ -118,14 +119,14 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa // Replicas in ONLINE state are always allowed if (state.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { - validServers.add(instance); + validServers.add(ServerInstance.forInstanceName(instance)); continue; } // If the server is in CONSUMING status, the segment has to be match with the valid consuming segment if (state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING) && validConsumingSegment != null && segmentNameStr.equals(validConsumingSegment.getSegmentName())) { - validServers.add(instance); + validServers.add(ServerInstance.forInstanceName(instance)); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java index d8df632..c105d97 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java @@ -38,6 +38,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; @@ -135,14 +136,14 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware } // 3. Compute the final routing look up table - Map<String, Map<Integer, String>> segmentToReplicaToServerMap = new HashMap<>(); + Map<String, Map<Integer, ServerInstance>> segmentToReplicaToServerMap = new HashMap<>(); for (String segmentName : segmentSet) { // Get partition_id from cached segment zk metadata SegmentZKMetadata segmentZKMetadata = _segmentToZkMetadataMapping.get(segmentName); int partitionId = getPartitionId(segmentZKMetadata); // Initialize data intermediate data structures or data - Map<Integer, String> replicaToServerMap = new HashMap<>(); + Map<Integer, ServerInstance> replicaToServerMap = new HashMap<>(); int replicaIdForNoPartitionMetadata = 0; for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) { @@ -151,10 +152,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware && !instancePruner.isInactive(serverName)) { // If there's no partition number in the metadata, assign replica id sequentially. if (partitionId == NO_PARTITION_NUMBER) { - replicaToServerMap.put(replicaIdForNoPartitionMetadata++, serverName); + replicaToServerMap.put(replicaIdForNoPartitionMetadata++, ServerInstance.forInstanceName(serverName)); } else { int replicaId = partitionToServerToReplicaMap.get(partitionId).get(serverName); - replicaToServerMap.put(replicaId, serverName); + replicaToServerMap.put(replicaId, ServerInstance.forInstanceName(serverName)); } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java index f302dc46..c9dbbb6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.LLCUtils; @@ -84,12 +85,12 @@ public class PartitionAwareRealtimeRoutingTableBuilder extends BasePartitionAwar RoutingTableInstancePruner instancePruner = new RoutingTableInstancePruner(instanceConfigs); // Compute map from segment to map from replica to server - Map<String, Map<Integer, String>> segmentToReplicaToServerMap = new HashMap<>(); + Map<String, Map<Integer, ServerInstance>> segmentToReplicaToServerMap = new HashMap<>(); for (String segmentName : segmentSet) { int partitionId = getPartitionId(segmentName); SegmentName validConsumingSegment = allowedSegmentInConsumingStateByPartition.get(Integer.toString(partitionId)); - Map<Integer, String> replicaToServerMap = new HashMap<>(); + Map<Integer, ServerInstance> replicaToServerMap = new HashMap<>(); int replicaId = 0; for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) { String serverName = entry.getKey(); @@ -102,13 +103,13 @@ public class PartitionAwareRealtimeRoutingTableBuilder extends BasePartitionAwar // If the server is in ONLINE status, it's always to safe to add if (state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { - replicaToServerMap.put(replicaId++, serverName); + replicaToServerMap.put(replicaId++, ServerInstance.forInstanceName(serverName)); } // If the server is in CONSUMING status, the segment has to be match with the valid consuming segment if (state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING) && validConsumingSegment != null && segmentName.equals(validConsumingSegment.getSegmentName())) { - replicaToServerMap.put(replicaId++, serverName); + replicaToServerMap.put(replicaId++, ServerInstance.forInstanceName(serverName)); } } @@ -129,7 +130,7 @@ public class PartitionAwareRealtimeRoutingTableBuilder extends BasePartitionAwar // Get the unique set of replica ids and find the maximum id to update the number of replicas Set<Integer> replicaGroupIds = new HashSet<>(); - for (Map<Integer, String> replicaToServer : segmentToReplicaToServerMap.values()) { + for (Map<Integer, ServerInstance> replicaToServer : segmentToReplicaToServerMap.values()) { replicaGroupIds.addAll(replicaToServer.keySet()); } int numReplicas = Collections.max(replicaGroupIds) + 1; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java index 7745d83..67c59db 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java @@ -29,6 +29,7 @@ import org.apache.pinot.broker.routing.RoutingTableLookupRequest; import org.apache.pinot.broker.routing.selector.SegmentSelector; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; /** @@ -54,10 +55,10 @@ public interface RoutingTableBuilder { * TODO: we need to consider relocating segment selector into the routing table builder instead of passing it * from outside. */ - Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector); + Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector); /** * Get all pre-computed routing tables. */ - List<Map<String, List<String>>> getRoutingTables(); + List<Map<ServerInstance, List<String>>> getRoutingTables(); } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index ccc6377..e7c0d4d 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.data.FieldSpec; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants.Broker; import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; @@ -138,7 +139,7 @@ public class HelixBrokerStarterTest extends ControllerTest { assertTrue(routing.routingTableExists(REALTIME_TABLE_NAME)); RoutingTableLookupRequest routingTableLookupRequest = new RoutingTableLookupRequest(OFFLINE_TABLE_NAME); - Map<String, List<String>> routingTable = routing.getRoutingTable(routingTableLookupRequest); + Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(routingTableLookupRequest); assertEquals(routingTable.size(), NUM_SERVERS); assertEquals(routingTable.values().iterator().next().size(), NUM_OFFLINE_SEGMENTS); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java index 5b5cdd6..8ac1be0 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java @@ -35,6 +35,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableConfig.Builder; import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.mockito.Mockito; import org.testng.Assert; @@ -65,7 +66,7 @@ public class RandomRoutingTableTest { routing.markDataResourceOnline(generateTableConfig(tableName), externalView, instanceConfigs); for (int i = 0; i < NUM_ROUNDS; i++) { - Map<String, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(tableName)); + Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(tableName)); Assert.assertEquals(routingTable.size(), numServersInEV); int numSegments = 0; for (List<String> segmentsForServer : routingTable.values()) { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java index e583799..8d5051c 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java @@ -39,6 +39,7 @@ import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.HLCSegmentName; import org.apache.pinot.common.utils.LLCSegmentName; @@ -150,7 +151,7 @@ public class RoutingTableTest { private void assertResourceRequest(HelixExternalViewBasedRouting routing, String resource, String expectedSegmentList, int expectedNumSegment) { - Map<String, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource)); + Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource)); List<String> selectedSegments = new ArrayList<>(); for (List<String> segmentsForServer : routingTable.values()) { selectedSegments.addAll(segmentsForServer); @@ -257,7 +258,7 @@ public class RoutingTableTest { private void assertResourceRequest(HelixExternalViewBasedRouting routing, String resource, String[] expectedSegmentLists, int expectedNumSegment) { - Map<String, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource)); + Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource)); List<String> selectedSegments = new ArrayList<>(); for (List<String> segmentsForServer : routingTable.values()) { selectedSegments.addAll(segmentsForServer); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java index cc18fec..e9de507 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java @@ -29,6 +29,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.broker.routing.RoutingTableLookupRequest; import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.response.ServerInstance; import org.testng.Assert; import org.testng.annotations.Test; @@ -50,13 +51,13 @@ public class BalancedRandomRoutingTableBuilderTest { // Build routing table routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList); - List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); + List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); // Check that at least two routing tables are different - Iterator<Map<String, List<String>>> routingTableIterator = routingTables.iterator(); - Map<String, List<String>> previous = routingTableIterator.next(); + Iterator<Map<ServerInstance, List<String>>> routingTableIterator = routingTables.iterator(); + Map<ServerInstance, List<String>> previous = routingTableIterator.next(); while (routingTableIterator.hasNext()) { - Map<String, List<String>> current = routingTableIterator.next(); + Map<ServerInstance, List<String>> current = routingTableIterator.next(); if (!current.equals(previous)) { return; } @@ -83,7 +84,7 @@ public class BalancedRandomRoutingTableBuilderTest { // Build routing table routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList); RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType); - Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null); + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null); Set<String> segmentsInRoutingTable = new HashSet<>(); for (List<String> segments : routingTable.values()) { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java index 225f3f8..01676ab 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java @@ -30,6 +30,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.broker.routing.RoutingTableLookupRequest; import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.HLCSegmentName; import org.testng.Assert; @@ -99,7 +100,7 @@ public class HighLevelConsumerRoutingTableBuilderTest { // Check if the routing table result is correct for (int run = 0; run < MAX_NUM_GROUPS * 10; run++) { RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType); - Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null); + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null); Set<String> coveredSegments = new HashSet<>(); for (List<String> segmentsForServer : routingTable.values()) { coveredSegments.addAll(segmentsForServer); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java index f081211..e1be01c 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.testng.Assert; import org.testng.annotations.Test; @@ -41,7 +42,7 @@ public class LargeClusterRoutingTableBuilderTest { private LargeClusterRoutingTableBuilder _largeClusterRoutingTableBuilder = new LargeClusterRoutingTableBuilder(); private interface RoutingTableValidator { - boolean isRoutingTableValid(Map<String, List<String>> routingTable, ExternalView externalView, + boolean isRoutingTableValid(Map<ServerInstance, List<String>> routingTable, ExternalView externalView, List<InstanceConfig> instanceConfigs); } @@ -49,7 +50,7 @@ public class LargeClusterRoutingTableBuilderTest { public void testRoutingTableCoversAllSegmentsExactlyOnce() { validateAssertionOverMultipleRoutingTables(new RoutingTableValidator() { @Override - public boolean isRoutingTableValid(Map<String, List<String>> routingTable, ExternalView externalView, + public boolean isRoutingTableValid(Map<ServerInstance, List<String>> routingTable, ExternalView externalView, List<InstanceConfig> instanceConfigs) { Set<String> unassignedSegments = new HashSet<>(); unassignedSegments.addAll(externalView.getPartitionSet()); @@ -94,12 +95,13 @@ public class LargeClusterRoutingTableBuilderTest { validateAssertionForOneRoutingTable(new RoutingTableValidator() { @Override - public boolean isRoutingTableValid(Map<String, List<String>> routingTable, ExternalView externalView, + public boolean isRoutingTableValid(Map<ServerInstance, List<String>> routingTable, ExternalView externalView, List<InstanceConfig> instanceConfigs) { - for (String serverName : routingTable.keySet()) { + for (ServerInstance serverInstance : routingTable.keySet()) { // These servers should not appear in the routing table - if (serverName.equals(disabledHelixInstanceName) || serverName.equals(shuttingDownInstanceName) || - serverName.equals(queriesDisabledInstanceName)) { + String instanceName = serverInstance.getInstanceName(); + if (instanceName.equals(disabledHelixInstanceName) || instanceName.equals(shuttingDownInstanceName) || + instanceName.equals(queriesDisabledInstanceName)) { return false; } } @@ -122,12 +124,12 @@ public class LargeClusterRoutingTableBuilderTest { _largeClusterRoutingTableBuilder.computeOnExternalViewChange(tableName, externalView, instanceConfigs); - List<Map<String, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables(); + List<Map<ServerInstance, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables(); int routingTableCount = 0; int largerThanDesiredRoutingTableCount = 0; - for (Map<String, List<String>> routingTable : routingTables) { + for (Map<ServerInstance, List<String>> routingTable : routingTables) { routingTableCount++; if (desiredServerCount < routingTable.size()) { largerThanDesiredRoutingTableCount++; @@ -154,22 +156,22 @@ public class LargeClusterRoutingTableBuilderTest { _largeClusterRoutingTableBuilder.computeOnExternalViewChange(tableName, externalView, instanceConfigs); - List<Map<String, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables(); + List<Map<ServerInstance, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables(); - Map<String, Integer> segmentCountPerServer = new HashMap<>(); + Map<ServerInstance, Integer> segmentCountPerServer = new HashMap<>(); // Count number of segments assigned per server - for (Map<String, List<String>> routingTable : routingTables) { - for (Map.Entry<String, List<String>> entry : routingTable.entrySet()) { - String serverName = entry.getKey(); - Integer numSegmentsForServer = segmentCountPerServer.get(serverName); + for (Map<ServerInstance, List<String>> routingTable : routingTables) { + for (Map.Entry<ServerInstance, List<String>> entry : routingTable.entrySet()) { + ServerInstance serverInstance = entry.getKey(); + Integer numSegmentsForServer = segmentCountPerServer.get(serverInstance); if (numSegmentsForServer == null) { numSegmentsForServer = 0; } numSegmentsForServer += entry.getValue().size(); - segmentCountPerServer.put(serverName, numSegmentsForServer); + segmentCountPerServer.put(serverInstance, numSegmentsForServer); } } @@ -253,9 +255,9 @@ public class LargeClusterRoutingTableBuilderTest { ExternalView externalView, List<InstanceConfig> instanceConfigs, String tableName) { _largeClusterRoutingTableBuilder.computeOnExternalViewChange(tableName, externalView, instanceConfigs); - List<Map<String, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables(); + List<Map<ServerInstance, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables(); - for (Map<String, List<String>> routingTable : routingTables) { + for (Map<ServerInstance, List<String>> routingTable : routingTables) { assertTrue(routingTableValidator.isRoutingTableValid(routingTable, externalView, instanceConfigs), message); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java index fc1c713..3ee0820 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java @@ -30,6 +30,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.LLCSegmentName; @@ -135,13 +136,13 @@ public class LowLevelConsumerRoutingTableBuilderTest { long startTime = System.nanoTime(); routingTableBuilder.computeOnExternalViewChange("table_REALTIME", externalView, instanceConfigs); - List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); + List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); long endTime = System.nanoTime(); totalNanos += endTime - startTime; // Check that all routing tables generated match all segments, with no duplicates - for (Map<String, List<String>> routingTable : routingTables) { + for (Map<ServerInstance, List<String>> routingTable : routingTables) { Set<String> assignedSegments = new HashSet<>(); for (List<String> segmentsForServer : routingTable.values()) { @@ -198,8 +199,8 @@ public class LowLevelConsumerRoutingTableBuilderTest { externalView.setState(consumingSegment2, instance2, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); routingTableBuilder.computeOnExternalViewChange(realtimeTableName, externalView, instanceConfigs); - List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); - for (Map<String, List<String>> routingTable : routingTables) { + List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); + for (Map<ServerInstance, List<String>> routingTable : routingTables) { ArrayList<String> segmentsInRoutingTable = new ArrayList<>(); for (List<String> segmentsForServer : routingTable.values()) { segmentsInRoutingTable.addAll(segmentsForServer); @@ -248,22 +249,22 @@ public class LowLevelConsumerRoutingTableBuilderTest { } routingTableBuilder.computeOnExternalViewChange(rawTableName, externalView, instanceConfigs); - List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); - for (Map<String, List<String>> routingTable : routingTables) { + List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables(); + for (Map<ServerInstance, List<String>> routingTable : routingTables) { Assert.assertTrue(routingTable.isEmpty()); } instanceConfig.getRecord().setSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, "false"); routingTableBuilder.computeOnExternalViewChange(rawTableName, externalView, instanceConfigs); routingTables = routingTableBuilder.getRoutingTables(); - for (Map<String, List<String>> routingTable : routingTables) { + for (Map<ServerInstance, List<String>> routingTable : routingTables) { Assert.assertFalse(routingTable.isEmpty()); } instanceConfig.getRecord().setSimpleField(CommonConstants.Helix.QUERIES_DISABLED, "true"); routingTableBuilder.computeOnExternalViewChange(rawTableName, externalView, instanceConfigs); routingTables = routingTableBuilder.getRoutingTables(); - for (Map<String, List<String>> routingTable : routingTables) { + for (Map<ServerInstance, List<String>> routingTable : routingTables) { Assert.assertTrue(routingTable.isEmpty()); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java index 52b2d82..f3f0fa3 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java @@ -41,6 +41,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.pql.parsers.Pql2Compiler; import org.testng.Assert; @@ -116,7 +117,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest { // Check the query that requires to scan all segment. String countStarQuery = "select count(*) from myTable"; - Map<String, List<String>> routingTable = + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); // Check that the number of servers picked are always equal or less than the number of servers @@ -217,10 +218,10 @@ public class PartitionAwareOfflineRoutingTableBuilderTest { // Compute routing table and this should not throw null pointer exception routingTableBuilder.computeOnExternalViewChange(OFFLINE_TABLE_NAME, newExternalView, instanceConfigs); - Set<String> servers = new HashSet<>(); + Set<ServerInstance> servers = new HashSet<>(); for (int i = 0; i < 100; i++) { String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME; - Map<String, List<String>> routingTable = + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); Assert.assertEquals(routingTable.keySet().size(), 1); servers.add(routingTable.keySet().iterator().next()); @@ -278,10 +279,10 @@ public class PartitionAwareOfflineRoutingTableBuilderTest { RoutingTableBuilder routingTableBuilder = buildPartitionAwareOfflineRoutingTableBuilder(fakePropertyStore, tableConfig, externalView, instanceConfigs); - Set<String> servers = new HashSet<>(); + Set<ServerInstance> servers = new HashSet<>(); for (int i = 0; i < 100; i++) { String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME; - Map<String, List<String>> routingTable = + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); Assert.assertEquals(routingTable.keySet().size(), 1); servers.add(routingTable.keySet().iterator().next()); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java index 6274b14..a4ccff4 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java @@ -39,6 +39,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.pql.parsers.Pql2Compiler; @@ -104,7 +105,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest { // Check the query that requires to scan all segment. String countStarQuery = "select count(*) from myTable"; - Map<String, List<String>> routingTable = + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); // Check that all segments are covered exactly for once. @@ -181,7 +182,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest { // Check the query that requires to scan all segment. String countStarQuery = "select count(*) from myTable"; - Map<String, List<String>> routingTable = + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); // Check that all segments are covered exactly for once. @@ -254,10 +255,10 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest { // Compute routing table routingTableBuilder.computeOnExternalViewChange(REALTIME_TABLE_NAME, newExternalView, instanceConfigs); - Set<String> servers = new HashSet<>(); + Set<ServerInstance> servers = new HashSet<>(); for (int i = 0; i < 100; i++) { String countStarQuery = "select count(*) from " + REALTIME_TABLE_NAME; - Map<String, List<String>> routingTable = + Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); Assert.assertEquals(routingTable.keySet().size(), 1); servers.addAll(routingTable.keySet()); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java b/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java index adbe111..e6bda2f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java @@ -144,6 +144,11 @@ public class ServerInstance { return _port; } + public String getInstanceName() { + return toString(); + } + + public ServerInstance withSeq(int seq) { return new ServerInstance(_hostName, _shortHostName, _port, seq); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index c2d7bb3..d0fd717 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -28,6 +28,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.DataTable; import org.slf4j.Logger; @@ -55,8 +56,8 @@ public class QueryRouter { } public AsyncQueryResponse submitQuery(long requestId, String rawTableName, - @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs) { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; @@ -64,16 +65,18 @@ public class QueryRouter { Map<Server, InstanceRequest> requestMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - for (Map.Entry<String, List<String>> entry : offlineRoutingTable.entrySet()) { - Server server = new Server(entry.getKey(), TableType.OFFLINE); + for (Map.Entry<ServerInstance, List<String>> entry : offlineRoutingTable.entrySet()) { + ServerInstance serverInstance = entry.getKey(); + Server server = new Server(serverInstance.getHostname(), serverInstance.getPort(), TableType.OFFLINE); InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue()); requestMap.put(server, instanceRequest); } } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - for (Map.Entry<String, List<String>> entry : realtimeRoutingTable.entrySet()) { - Server server = new Server(entry.getKey(), TableType.REALTIME); + for (Map.Entry<ServerInstance, List<String>> entry : realtimeRoutingTable.entrySet()) { + ServerInstance serverInstance = entry.getKey(); + Server server = new Server(serverInstance.getHostname(), serverInstance.getPort(), TableType.REALTIME); InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); requestMap.put(server, instanceRequest); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java index d8849f9..033eb9c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java @@ -53,6 +53,12 @@ public class Server { _tableType = tableType; } + public Server(String hostName, int port, TableType tableType) { + _hostName = hostName; + _port = port; + _tableType = tableType; + } + public String getHostName() { return _hostName; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java index 7ab391a..646431d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ServerInstance; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.datatable.DataTableImplV2; @@ -39,8 +40,8 @@ public class QueryRouterTest { private static final Server OFFLINE_SERVER = new Server(SERVER_INSTANCE_NAME, TableType.OFFLINE); private static final Server REALTIME_SERVER = new Server(SERVER_INSTANCE_NAME, TableType.REALTIME); private static final BrokerRequest BROKER_REQUEST = new BrokerRequest(); - private static final Map<String, List<String>> ROUTING_TABLE = - Collections.singletonMap(SERVER_INSTANCE_NAME, Collections.emptyList()); + private static final Map<ServerInstance, List<String>> ROUTING_TABLE = + Collections.singletonMap(ServerInstance.forInstanceName(SERVER_INSTANCE_NAME), Collections.emptyList()); private QueryRouter _queryRouter; diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java b/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java index 2f65230..80f9536 100644 --- a/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java +++ b/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.configuration.Configuration; +import org.apache.pinot.common.response.ServerInstance; /** @@ -125,10 +126,10 @@ public class PerTableRoutingConfig { * * @return */ - public Map<String, List<String>> buildRequestRoutingMap() { - Map<String, List<String>> resultMap = new HashMap<>(); + public Map<ServerInstance, List<String>> buildRequestRoutingMap() { + Map<ServerInstance, List<String>> resultMap = new HashMap<>(); for (String serverName : _defaultServers) { - resultMap.put(serverName, Collections.singletonList("default")); + resultMap.put(ServerInstance.forInstanceName(serverName), Collections.singletonList("default")); } return resultMap; } diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java index 64d852d..9857f78 100644 --- a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java +++ b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java @@ -96,14 +96,14 @@ public class ScatterGatherImpl implements ScatterGather { ScatterGatherStats scatterGatherStats, Boolean isOfflineTable, BrokerMetrics brokerMetrics) throws InterruptedException { ScatterGatherRequest scatterGatherRequest = scatterGatherRequestContext._request; - Map<String, List<String>> routingTable = scatterGatherRequest.getRoutingTable(); + Map<ServerInstance, List<String>> routingTable = scatterGatherRequest.getRoutingTable(); CountDownLatch requestDispatchLatch = new CountDownLatch(routingTable.size()); // async checkout of connections and then dispatch of request List<SingleRequestHandler> handlers = new ArrayList<>(routingTable.size()); - for (Entry<String, List<String>> entry : routingTable.entrySet()) { - ServerInstance serverInstance = ServerInstance.forInstanceName(entry.getKey()); + for (Entry<ServerInstance, List<String>> entry : routingTable.entrySet()) { + ServerInstance serverInstance = entry.getKey(); String shortServerName = serverInstance.getShortHostName(); if (isOfflineTable != null) { if (isOfflineTable) { diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java index 8ea9ee6..d3c631d 100644 --- a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java +++ b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java @@ -21,6 +21,7 @@ package org.apache.pinot.transport.scattergather; import java.util.List; import java.util.Map; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ServerInstance; /** @@ -35,7 +36,7 @@ public interface ScatterGatherRequest { * * @return Map from server to list of segments */ - Map<String, List<String>> getRoutingTable(); + Map<ServerInstance, List<String>> getRoutingTable(); /** * Get the request to be sent to the server. diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java index bf4422c..ab85a48 100644 --- a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java +++ b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java @@ -388,7 +388,7 @@ public class ScatterGatherPerfClient implements Runnable { public static class SimpleScatterGatherRequest implements ScatterGatherRequest { private final byte[] _brokerRequest; private final long _requestId; - private final Map<String, List<String>> _pgToServersMap; + private final Map<ServerInstance, List<String>> _pgToServersMap; public SimpleScatterGatherRequest(byte[] q, PerTableRoutingConfig routingConfig, long requestId) { _brokerRequest = q; @@ -397,7 +397,7 @@ public class ScatterGatherPerfClient implements Runnable { } @Override - public Map<String, List<String>> getRoutingTable() { + public Map<ServerInstance, List<String>> getRoutingTable() { return _pgToServersMap; } diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java index 6871169..ddd1a8e 100644 --- a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java +++ b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java @@ -66,7 +66,7 @@ public class ScatterGatherTest { NettyServer[] nettyServers = new NettyServer[NUM_SERVERS]; String[] serverNames = new String[NUM_SERVERS]; ServerInstance[] serverInstances = new ServerInstance[NUM_SERVERS]; - Map<String, List<String>> routingTable = new HashMap<>(NUM_SERVERS); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(NUM_SERVERS); for (int i = 0; i < NUM_SERVERS; i++) { int serverPort = BASE_SERVER_PORT + i; @@ -77,7 +77,7 @@ public class ScatterGatherTest { + ServerInstance.NAME_PORT_DELIMITER_FOR_INSTANCE_NAME + serverPort; serverNames[i] = serverName; serverInstances[i] = ServerInstance.forInstanceName(serverName); - routingTable.put(serverName, Collections.singletonList("segment_" + i)); + routingTable.put(serverInstances[i], Collections.singletonList("segment_" + i)); } // Setup client @@ -122,7 +122,7 @@ public class ScatterGatherTest { NettyServer[] nettyServers = new NettyServer[NUM_SERVERS]; String[] serverNames = new String[NUM_SERVERS]; ServerInstance[] serverInstances = new ServerInstance[NUM_SERVERS]; - Map<String, List<String>> routingTable = new HashMap<>(NUM_SERVERS); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(NUM_SERVERS); for (int i = 0; i < NUM_SERVERS; i++) { int serverPort = BASE_SERVER_PORT + i; @@ -139,7 +139,7 @@ public class ScatterGatherTest { + ServerInstance.NAME_PORT_DELIMITER_FOR_INSTANCE_NAME + serverPort; serverNames[i] = serverName; serverInstances[i] = ServerInstance.forInstanceName(serverName); - routingTable.put(serverName, Collections.singletonList("segment_" + i)); + routingTable.put(serverInstances[i], Collections.singletonList("segment_" + i)); } // Setup client @@ -185,7 +185,7 @@ public class ScatterGatherTest { NettyServer[] nettyServers = new NettyServer[NUM_SERVERS]; String[] serverNames = new String[NUM_SERVERS]; ServerInstance[] serverInstances = new ServerInstance[NUM_SERVERS]; - Map<String, List<String>> routingTable = new HashMap<>(NUM_SERVERS); + Map<ServerInstance, List<String>> routingTable = new HashMap<>(NUM_SERVERS); for (int i = 0; i < NUM_SERVERS; i++) { int serverPort = BASE_SERVER_PORT + i; @@ -203,7 +203,7 @@ public class ScatterGatherTest { + ServerInstance.NAME_PORT_DELIMITER_FOR_INSTANCE_NAME + serverPort; serverNames[i] = serverName; serverInstances[i] = ServerInstance.forInstanceName(serverName); - routingTable.put(serverName, Collections.singletonList("segment_" + i)); + routingTable.put(serverInstances[i], Collections.singletonList("segment_" + i)); } // Setup client @@ -257,16 +257,16 @@ public class ScatterGatherTest { } private static class TestScatterGatherRequest implements ScatterGatherRequest { - private final Map<String, List<String>> _routingTable; + private final Map<ServerInstance, List<String>> _routingTable; private final long _timeoutMs; - public TestScatterGatherRequest(Map<String, List<String>> routingTable, long timeoutMs) { + public TestScatterGatherRequest(Map<ServerInstance, List<String>> routingTable, long timeoutMs) { _routingTable = routingTable; _timeoutMs = timeoutMs; } @Override - public Map<String, List<String>> getRoutingTable() { + public Map<ServerInstance, List<String>> getRoutingTable() { return _routingTable; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
