This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch winedepot-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f2baddd5601c03bd6161807e3b148aa54cf0167b Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Mon Aug 19 14:15:21 2019 -0700 [Instance Assignment] De-couple assignment strategy from SegmentAssignment (#4533) For REALTIME segments, we should be able to assign CONSUMING and COMPLETED segments with different strategies, i.e. one could follows replica-group while the other follows balance-number. In order to do so, we need to de-couple the assignment strategy from the segment assignment, so each assignment can use different strategies if necessary. - Change SegmentAssignmentStrategy interface to SegmentAssignment - Add OfflineSegmentAssignment and RealtimeSegmentAssignment - Use InstancePartitions to determine the strategy to use - For REALTIME table, support different strategies for CONSUMING and COMPLETED segments --- .../helix/core/assignment/InstancePartitions.java | 17 ++ ...OfflineBalanceNumSegmentAssignmentStrategy.java | 96 -------- ...flineReplicaGroupSegmentAssignmentStrategy.java | 200 ---------------- .../segment/OfflineSegmentAssignment.java | 260 +++++++++++++++++++++ ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 161 ------------- ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 165 ------------- .../segment/RealtimeSegmentAssignment.java | 218 +++++++++++++++++ ...ignmentStrategy.java => SegmentAssignment.java} | 27 ++- .../segment/SegmentAssignmentFactory.java | 43 ++++ .../segment/SegmentAssignmentStrategyFactory.java | 54 ----- .../assignment/segment/SegmentAssignmentUtils.java | 26 +-- ...PinotInstanceAssignmentRestletResourceTest.java | 2 +- ...flineNonReplicaGroupSegmentAssignmentTest.java} | 43 ++-- ... OfflineReplicaGroupSegmentAssignmentTest.java} | 115 ++++----- ...ltimeNonReplicaGroupSegmentAssignmentTest.java} | 65 ++---- ...RealtimeReplicaGroupSegmentAssignmentTest.java} | 67 +++--- 16 files changed, 692 insertions(+), 867 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java index 4ed659e..4060f4b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.helix.ZNRecord; +import org.apache.pinot.common.utils.JsonUtils; /** @@ -64,10 +66,12 @@ public class InstancePartitions { } } + @JsonProperty public String getName() { return _name; } + @JsonProperty public Map<String, List<String>> getPartitionToInstancesMap() { return _partitionToInstancesMap; } @@ -102,4 +106,17 @@ public class InstancePartitions { znRecord.setListFields(_partitionToInstancesMap); return znRecord; } + + public String toJsonString() { + try { + return JsonUtils.objectToString(this); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } + + @Override + public String toString() { + return toJsonString(); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java deleted file mode 100644 index 4bd34b5..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.assignment.segment; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import org.apache.commons.configuration.Configuration; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.utils.InstancePartitionsType; -import org.apache.pinot.common.utils.Pairs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Segment assignment strategy for offline segments that assigns segment to the instance with the least number of - * segments. In case of a tie, assigns to the instance with the smallest index in the list. The strategy ensures that - * replicas of the same segment are not assigned to the same server. - * <p>To rebalance a table, use Helix AutoRebalanceStrategy. - */ -public class OfflineBalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy { - private static final Logger LOGGER = LoggerFactory.getLogger(OfflineBalanceNumSegmentAssignmentStrategy.class); - - private HelixManager _helixManager; - private TableConfig _tableConfig; - private String _tableNameWithType; - private int _replication; - - @Override - public void init(HelixManager helixManager, TableConfig tableConfig) { - _helixManager = helixManager; - _tableConfig = tableConfig; - _tableNameWithType = tableConfig.getTableName(); - _replication = tableConfig.getValidationConfig().getReplicationNumber(); - - LOGGER.info("Initialized OfflineBalanceNumSegmentAssignmentStrategy for table: {} with replication: {}", - _tableNameWithType, _replication); - } - - @Override - public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { - List<String> instances = SegmentAssignmentUtils - .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.OFFLINE); - int[] numSegmentsAssignedPerInstance = - SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); - - // Assign the segment to the instance with the least segments, or the smallest id if there is a tie - int numInstances = numSegmentsAssignedPerInstance.length; - PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator()); - for (int instanceId = 0; instanceId < numInstances; instanceId++) { - heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId)); - } - List<String> instancesAssigned = new ArrayList<>(_replication); - for (int i = 0; i < _replication; i++) { - instancesAssigned.add(instances.get(heap.remove().getRight())); - } - - LOGGER.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, - _tableNameWithType); - return instancesAssigned; - } - - @Override - public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, - Configuration config) { - List<String> instances = SegmentAssignmentUtils - .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.OFFLINE); - Map<String, Map<String, String>> newAssignment = - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication); - - LOGGER.info( - "Rebalanced {} segments to instances: {} for table: {} with replication: {}, number of segments to be moved to each instance: {}", - currentAssignment.size(), instances, _tableNameWithType, _replication, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); - return newAssignment; - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java deleted file mode 100644 index 8124d30..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.assignment.segment; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import org.apache.commons.configuration.Configuration; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.config.ReplicaGroupStrategyConfig; -import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; -import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; -import org.apache.pinot.common.utils.InstancePartitionsType; -import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; -import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Segment assignment strategy for offline segments that assigns segment to the instance in the replica with the least - * number of segments. - * <p>Among multiple replicas, always mirror the assignment (pick the same index of the instance). - * <p>Inside each partition, assign the segment to the servers with the least segments already assigned. In case of a - * tie, assign to the server with the smallest index in the list. Do this for one replica and mirror the assignment to - * other replicas. - * <p>To rebalance a table, inside each partition, first calculate the number of segments on each server, loop over all - * the segments and keep the assignment if number of segments for the server has not been reached and track the not - * assigned segments, then assign the left-over segments to the servers with the least segments, or the smallest index - * if there is a tie. Repeat the process for all the partitions in one replica, and mirror the assignment to other - * replicas. With this greedy algorithm, the result is deterministic and with minimum segment moves. - */ -public class OfflineReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy { - private static final Logger LOGGER = LoggerFactory.getLogger(OfflineReplicaGroupSegmentAssignmentStrategy.class); - - private HelixManager _helixManager; - private TableConfig _tableConfig; - private String _tableNameWithType; - private String _partitionColumn; - - @Override - public void init(HelixManager helixManager, TableConfig tableConfig) { - _helixManager = helixManager; - _tableConfig = tableConfig; - _tableNameWithType = tableConfig.getTableName(); - ReplicaGroupStrategyConfig strategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); - _partitionColumn = strategyConfig != null ? strategyConfig.getPartitionColumn() : null; - - if (_partitionColumn == null) { - LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy for table: {} without partition column", - _tableNameWithType); - } else { - LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy for table: {} with partition column: {}", - _tableNameWithType, _partitionColumn); - } - } - - @Override - public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { - InstancePartitions instancePartitions = InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.OFFLINE); - - // Fetch partition id from segment ZK metadata if partition column is configured - int partitionId = 0; - if (_partitionColumn == null) { - Preconditions.checkState(instancePartitions.getNumPartitions() == 1, - "The instance partitions: %s should contain only 1 partition", instancePartitions.getName()); - } else { - OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider - .getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(), _tableNameWithType, segmentName); - Preconditions - .checkState(segmentZKMetadata != null, "Failed to fetch segment ZK metadata for table: %s, segment: %s", - _tableNameWithType, segmentName); - // Uniformly spray the segment partitions over the instance partitions - partitionId = getPartitionId(segmentZKMetadata) % instancePartitions.getNumPartitions(); - } - - // First assign the segment to replica 0 - List<String> instances = instancePartitions.getInstances(partitionId, 0); - int[] numSegmentsAssignedPerInstance = - SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); - int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0]; - int instanceIdWithLeastSegmentsAssigned = 0; - int numInstances = numSegmentsAssignedPerInstance.length; - for (int instanceId = 1; instanceId < numInstances; instanceId++) { - if (numSegmentsAssignedPerInstance[instanceId] < minNumSegmentsAssigned) { - minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId]; - instanceIdWithLeastSegmentsAssigned = instanceId; - } - } - - // Mirror the assignment to all replicas - int numReplicas = instancePartitions.getNumReplicas(); - List<String> instancesAssigned = new ArrayList<>(numReplicas); - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { - instancesAssigned - .add(instancePartitions.getInstances(partitionId, replicaId).get(instanceIdWithLeastSegmentsAssigned)); - } - - if (_partitionColumn == null) { - LOGGER.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, - _tableNameWithType); - } else { - LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} for table: {}", segmentName, partitionId, - instancesAssigned, _tableNameWithType); - } - return instancesAssigned; - } - - @Override - public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, - Configuration config) { - InstancePartitions instancePartitions = InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.OFFLINE); - if (_partitionColumn == null) { - return rebalanceTableWithoutPartition(currentAssignment, instancePartitions); - } else { - return rebalanceTableWithPartition(currentAssignment, instancePartitions); - } - } - - private Map<String, Map<String, String>> rebalanceTableWithoutPartition( - Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) { - Preconditions.checkState(instancePartitions.getNumPartitions() == 1, - "The instance partitions: %s should contain only 1 partition", instancePartitions.getName()); - - Map<String, Map<String, String>> newAssignment = new TreeMap<>(); - SegmentAssignmentUtils - .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, currentAssignment.keySet(), - newAssignment); - - LOGGER.info( - "Rebalanced {} segments with instance partitions: {} for table: {} without partition column, number of segments to be moved to each instance: {}", - currentAssignment.size(), instancePartitions.getPartitionToInstancesMap(), _tableNameWithType, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); - return newAssignment; - } - - private Map<String, Map<String, String>> rebalanceTableWithPartition( - Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) { - // Fetch partition id from segment ZK metadata - List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider - .getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(), _tableNameWithType); - Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new HashMap<>(); - for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { - segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata); - } - Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); - for (String segmentName : currentAssignment.keySet()) { - int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName)); - partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName); - } - - Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils - .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap); - - LOGGER.info( - "Rebalanced {} segments with instance partitions: {} for table: {} with partition column: {}, number of segments to be moved to each instance: {}", - currentAssignment.size(), instancePartitions.getPartitionToInstancesMap(), _tableNameWithType, _partitionColumn, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); - return newAssignment; - } - - private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - ColumnPartitionMetadata partitionMetadata = - segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn); - Preconditions.checkState(partitionMetadata != null, - "Segment ZK metadata for table: %s, segment: %s does not contain partition metadata for column: %s", - _tableNameWithType, segmentName, _partitionColumn); - Set<Integer> partitions = partitionMetadata.getPartitions(); - Preconditions.checkState(partitions.size() == 1, - "Segment ZK metadata for table: %s, segment: %s contains multiple partitions for column: %s", - _tableNameWithType, segmentName, _partitionColumn); - return partitions.iterator().next(); - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java new file mode 100644 index 0000000..5dfb034 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeMap; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.ReplicaGroupStrategyConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.Pairs; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Segment assignment for offline table. + * <ul> + * <li> + * Non-replica-group based assignment (only 1 replica in instance partitions): + * <p>Assign the segment to the instance with the least number of segments. In case of a tie, assign the segment to + * the instance with the smallest index in the list. Use Helix AutoRebalanceStrategy to rebalance the table. + * </li> + * <li> + * Replica-group based assignment (more than 1 replicas in instance partitions): + * <p>Among replicas, always mirror the assignment (pick the same index of the instance). + * <p>Within each partition, assign the segment to the servers with the least segments already assigned. In case of + * a tie, assign to the server with the smallest index in the list. Do this for one replica and mirror the + * assignment to other replicas. + * <p>To rebalance a table, within each partition, first calculate the number of segments on each server, loop over + * all the segments and keep the assignment if number of segments for the server has not been reached and track the + * not assigned segments, then assign the left-over segments to the servers with the least segments, or the smallest + * index if there is a tie. Repeat the process for all the partitions in one replica, and mirror the assignment to + * other replicas. With this greedy algorithm, the result is deterministic and with minimum segment moves. + * </li> + * </ul> + */ +public class OfflineSegmentAssignment implements SegmentAssignment { + private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentAssignment.class); + + private HelixManager _helixManager; + private String _offlineTableName; + private int _replication; + private String _partitionColumn; + + @Override + public void init(HelixManager helixManager, TableConfig tableConfig) { + _helixManager = helixManager; + _offlineTableName = tableConfig.getTableName(); + _replication = tableConfig.getValidationConfig().getReplicationNumber(); + ReplicaGroupStrategyConfig strategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); + _partitionColumn = strategyConfig != null ? strategyConfig.getPartitionColumn() : null; + + if (_partitionColumn == null) { + LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} without partition column for table: {} ", + _replication, _offlineTableName); + } else { + LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} and partition column: {} for table: {}", + _replication, _partitionColumn, _offlineTableName); + } + } + + @Override + public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); + Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s", + _offlineTableName); + LOGGER.info("Assigning segment: {} with instance partitions: {} for table: {}", segmentName, instancePartitions, + _offlineTableName); + + List<String> instancesAssigned; + if (instancePartitions.getNumReplicas() == 1) { + // Non-replica-group based assignment + + // Assign the segment to the instance with the least segments, or the smallest id if there is a tie + List<String> instances = + SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); + int numInstances = numSegmentsAssignedPerInstance.length; + PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator()); + for (int instanceId = 0; instanceId < numInstances; instanceId++) { + heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId)); + } + instancesAssigned = new ArrayList<>(_replication); + for (int i = 0; i < _replication; i++) { + instancesAssigned.add(instances.get(heap.remove().getRight())); + } + } else { + // Replica-group based assignment + + int numReplicas = instancePartitions.getNumReplicas(); + if (numReplicas != _replication) { + LOGGER.warn( + "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + instancePartitions.getName(), numReplicas, _replication, _offlineTableName, numReplicas); + } + + // Fetch partition id from segment ZK metadata if partition column is configured + int partitionId; + if (_partitionColumn == null) { + LOGGER.info("Assigning segment: {} without partition column for table: {}", segmentName, _offlineTableName); + + Preconditions.checkState(instancePartitions.getNumPartitions() == 1, + "Instance partitions: %s should contain 1 partition without partition column", + instancePartitions.getName()); + partitionId = 0; + } else { + LOGGER.info("Assigning segment: {} with partition column: {} for table: {}", segmentName, _partitionColumn, + _offlineTableName); + + OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider + .getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(), _offlineTableName, segmentName); + Preconditions + .checkState(segmentZKMetadata != null, "Failed to find segment ZK metadata for segment: %s of table: %s", + segmentName, _offlineTableName); + int segmentPartitionId = getPartitionId(segmentZKMetadata); + + // Uniformly spray the segment partitions over the instance partitions + int numPartitions = instancePartitions.getNumPartitions(); + partitionId = segmentPartitionId % numPartitions; + LOGGER.info("Assigning segment: {} with partition id: {} to partition: {}/{} for table: {}", segmentName, + segmentPartitionId, partitionId, numPartitions, _offlineTableName); + } + + // First assign the segment to replica 0 + List<String> instances = instancePartitions.getInstances(partitionId, 0); + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); + int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0]; + int instanceIdWithLeastSegmentsAssigned = 0; + int numInstances = numSegmentsAssignedPerInstance.length; + for (int instanceId = 1; instanceId < numInstances; instanceId++) { + if (numSegmentsAssignedPerInstance[instanceId] < minNumSegmentsAssigned) { + minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId]; + instanceIdWithLeastSegmentsAssigned = instanceId; + } + } + + // Mirror the assignment to all replicas + instancesAssigned = new ArrayList<>(numReplicas); + for (int replicaId = 0; replicaId < numReplicas; replicaId++) { + instancesAssigned + .add(instancePartitions.getInstances(partitionId, replicaId).get(instanceIdWithLeastSegmentsAssigned)); + } + } + + LOGGER + .info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, _offlineTableName); + return instancesAssigned; + } + + @Override + public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) { + InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); + Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s", + _offlineTableName); + LOGGER.info("Rebalancing table: {} with instance partitions: {}", _offlineTableName, instancePartitions); + + Map<String, Map<String, String>> newAssignment; + if (instancePartitions.getNumReplicas() == 1) { + // Non-replica-group based assignment + + List<String> instances = + SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); + newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication); + } else { + // Replica-group based assignment + + int numReplicas = instancePartitions.getNumReplicas(); + if (numReplicas != _replication) { + LOGGER.warn( + "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + instancePartitions.getName(), numReplicas, _replication, _offlineTableName, numReplicas); + } + + if (_partitionColumn == null) { + LOGGER.info("Rebalancing table: {} without partition column", _offlineTableName); + Preconditions.checkState(instancePartitions.getNumPartitions() == 1, + "Instance partitions: %s should contain 1 partition without partition column", + instancePartitions.getName()); + newAssignment = new TreeMap<>(); + SegmentAssignmentUtils + .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, currentAssignment.keySet(), + newAssignment); + } else { + LOGGER.info("Rebalancing table: {} with partition column: {}", _offlineTableName, _partitionColumn); + newAssignment = rebalanceTableWithPartition(currentAssignment, instancePartitions); + } + } + + LOGGER.info("Rebalanced table: {}, number of segments to be moved to each instance: {}", _offlineTableName, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + return newAssignment; + } + + private Map<String, Map<String, String>> rebalanceTableWithPartition( + Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) { + // Fetch partition id from segment ZK metadata + List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider + .getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(), _offlineTableName); + Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new HashMap<>(); + for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata); + } + Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); + for (String segmentName : currentAssignment.keySet()) { + int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName)); + partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName); + } + + return SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap); + } + + private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) { + String segmentName = segmentZKMetadata.getSegmentName(); + ColumnPartitionMetadata partitionMetadata = + segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn); + Preconditions.checkState(partitionMetadata != null, + "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s", + segmentName, _offlineTableName, _partitionColumn); + Set<Integer> partitions = partitionMetadata.getPartitions(); + Preconditions.checkState(partitions.size() == 1, + "Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s", segmentName, + _offlineTableName, _partitionColumn); + return partitions.iterator().next(); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java deleted file mode 100644 index 3319ad7..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.assignment.segment; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; -import org.apache.pinot.common.utils.InstancePartitionsType; -import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Segment assignment strategy for LLC real-time segments without replica-group. - * <ul> - * <li> - * For the CONSUMING segments, it is very similar to replica-group based segment assignment with the following - * differences: - * <ul> - * <li> - * 1. Within a replica, all segments of a partition (steam partition) always exist in exactly one one server - * </li> - * <li> - * 2. Partition id for an instance is derived from the index of the instance, instead of explicitly stored in - * the instance partitions - * </li> - * <li> - * 3. In addition to the ONLINE segments, there are also CONSUMING segments to be assigned - * </li> - * </ul> - * Since within a replica, each partition contains only one server, we can directly assign or rebalance the - * CONSUMING segments to the servers based on the partition id. - * <p>The strategy does not minimize segment movements for CONSUMING segments because within a replica, the server - * is fixed for each partition. The instance assignment is responsible for keeping minimum changes to the instance - * partitions to reduce the number of segments need to be moved. - * </li> - * <li> - * For the COMPLETED segments, rebalance segments the same way as OfflineBalanceNumSegmentAssignmentStrategy. - * </li> - * </ul> - */ -public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy { - private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeBalanceNumSegmentAssignmentStrategy.class); - - private HelixManager _helixManager; - private TableConfig _tableConfig; - private String _tableNameWithType; - private int _replication; - - @Override - public void init(HelixManager helixManager, TableConfig tableConfig) { - _helixManager = helixManager; - _tableConfig = tableConfig; - _tableNameWithType = tableConfig.getTableName(); - _replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); - - LOGGER.info("Initialized RealtimeBalanceNumSegmentAssignmentStrategy for table: {} with replication: {}", - _tableNameWithType, _replication); - } - - @Override - public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { - List<String> instances = SegmentAssignmentUtils - .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.CONSUMING); - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); - List<String> instancesAssigned = getInstances(instances, partitionId); - LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} for table: {}", segmentName, partitionId, - instancesAssigned, _tableNameWithType); - return instancesAssigned; - } - - @Override - public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, - Configuration config) { - SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = - new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment); - - // Rebalance COMPLETED segments first - Map<String, Map<String, String>> completedSegmentAssignment = - completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment(); - List<String> instancesForCompletedSegments = SegmentAssignmentUtils - .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.COMPLETED); - Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils - .rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment, instancesForCompletedSegments, - _replication); - - // Rebalance CONSUMING segments if needed - Map<String, Map<String, String>> consumingSegmentAssignment = - completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment(); - if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, - RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { - List<String> instancesForConsumingSegments = SegmentAssignmentUtils - .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, - InstancePartitionsType.CONSUMING); - for (String segmentName : consumingSegmentAssignment.keySet()) { - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); - List<String> instancesAssigned = getInstances(instancesForConsumingSegments, partitionId); - Map<String, String> instanceStateMap = SegmentAssignmentUtils - .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); - newAssignment.put(segmentName, instanceStateMap); - } - LOGGER.info( - "Rebalanced {} COMPLETED segments to instances: {} and {} CONSUMING segments to instances: {} for table: {} with replication: {}, number of segments to be moved to each instances: {}", - completedSegmentAssignment.size(), instancesForCompletedSegments, consumingSegmentAssignment.size(), - instancesForConsumingSegments, _tableNameWithType, _replication, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); - } else { - LOGGER.info( - "Rebalanced {} COMPLETED segments to instances: {} for table: {} with replication: {}, number of segments to be moved to each instance: {}", - completedSegmentAssignment.size(), instancesForCompletedSegments, _tableNameWithType, _replication, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment, newAssignment)); - newAssignment.putAll(consumingSegmentAssignment); - } - - // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE - // segments and re-assign them - newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment()); - - return newAssignment; - } - - /** - * Returns the instances for the given partition id for CONSUMING segments. - * <p>Uniformly spray the partitions and replicas across the instances. - * <p>E.g. (6 servers, 3 partitions, 4 replicas) - * <pre> - * "0_0": [i0, i1, i2, i3, i4, i5 ] - * p0r0, p0r1, p0r2, p1r3, p1r0, p1r1 - * p1r2, p1r3, p2r0, p2r1, p2r2, p2r3 - * </pre> - */ - private List<String> getInstances(List<String> instances, int partitionId) { - List<String> instancesAssigned = new ArrayList<>(_replication); - for (int replicaId = 0; replicaId < _replication; replicaId++) { - instancesAssigned.add(instances.get((partitionId * _replication + replicaId) % instances.size())); - } - return instancesAssigned; - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java deleted file mode 100644 index 05febc8..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.assignment.segment; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.commons.configuration.Configuration; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.utils.CommonConstants; -import org.apache.pinot.common.utils.InstancePartitionsType; -import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; -import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Segment assignment strategy for LLC real-time segments (both consuming and completed). - * <p>It is very similar to replica-group based segment assignment with the following differences: - * <ul> - * <li>1. Inside one replica, each partition (stream partition) always contains one server</li> - * <li> - * 2. Partition id for an instance is derived from the index of the instance in the replica group, instead of - * explicitly stored in the instance partitions - * </li> - * <li>3. In addition to the ONLINE segments, there are also CONSUMING segments to be assigned</li> - * </ul> - * <p> - * Since each partition contains only one server (in one replica), we can directly assign or rebalance segments to the - * servers based on the partition id. - * <p> - * The real-time segment assignment does not minimize segment moves because the server is fixed for each partition in - * each replica. The instance assignment is responsible for keeping minimum changes to the instance partitions to - * reduce the number of segments need to be moved. - */ -public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy { - private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeReplicaGroupSegmentAssignmentStrategy.class); - - private HelixManager _helixManager; - private TableConfig _tableConfig; - private String _tableNameWithType; - - @Override - public void init(HelixManager helixManager, TableConfig tableConfig) { - _helixManager = helixManager; - _tableConfig = tableConfig; - _tableNameWithType = tableConfig.getTableName(); - - LOGGER.info("Initialized RealtimeReplicaGroupSegmentAssignmentStrategy for table: {}", _tableNameWithType); - } - - @Override - public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { - InstancePartitions instancePartitions = InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.CONSUMING); - Preconditions.checkState(instancePartitions.getNumPartitions() == 1, - "The instance partitions: %s should contain only 1 partition", instancePartitions.getName()); - - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); - List<String> instancesAssigned = getInstances(instancePartitions, partitionId); - LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} for table: {}", segmentName, partitionId, - instancesAssigned, _tableNameWithType); - return instancesAssigned; - } - - @Override - public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, - Configuration config) { - SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = - new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment); - - // Rebalance COMPLETED segments first - Map<String, Map<String, String>> completedSegmentAssignment = - completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment(); - InstancePartitions instancePartitionsForCompletedSegments = InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED); - Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); - for (String segmentName : completedSegmentAssignment.keySet()) { - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); - partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName); - } - Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils - .rebalanceReplicaGroupBasedTable(completedSegmentAssignment, instancePartitionsForCompletedSegments, - partitionIdToSegmentsMap); - - // Rebalance CONSUMING segments if needed - Map<String, Map<String, String>> consumingSegmentAssignment = - completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment(); - if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, - RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { - InstancePartitions instancePartitionsForConsumingSegments = InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.CONSUMING); - for (String segmentName : consumingSegmentAssignment.keySet()) { - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); - List<String> instancesAssigned = getInstances(instancePartitionsForConsumingSegments, partitionId); - Map<String, String> instanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, - CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING); - newAssignment.put(segmentName, instanceStateMap); - } - LOGGER.info( - "Rebalanced {} COMPLETED segments with instance partitions: {} and {} CONSUMING segments with instance partitions: {} for table: {}, number of segments to be moved to each instances: {}", - completedSegmentAssignment.size(), instancePartitionsForCompletedSegments.getPartitionToInstancesMap(), - consumingSegmentAssignment.size(), instancePartitionsForConsumingSegments.getPartitionToInstancesMap(), - _tableNameWithType, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); - } else { - LOGGER.info( - "Rebalanced {} COMPLETED segments with instance partitions: {} for table: {}, number of segments to be moved to each instance: {}", - completedSegmentAssignment.size(), instancePartitionsForCompletedSegments.getPartitionToInstancesMap(), - _tableNameWithType, - SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment, newAssignment)); - newAssignment.putAll(consumingSegmentAssignment); - } - - // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE - // segments and re-assign them - newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment()); - - return newAssignment; - } - - /** - * Returns the instances for the given partition id for CONSUMING segments. - * <p>Within a replica, uniformly spray the partitions across the instances. - * <p>E.g. (within a replica, 3 servers, 6 partitions) - * <pre> - * "0_0": [i0, i1, i2] - * p0 p1 p2 - * p3 p4 p5 - * </pre> - */ - private List<String> getInstances(InstancePartitions instancePartitions, int partitionId) { - int numReplicas = instancePartitions.getNumReplicas(); - List<String> instancesAssigned = new ArrayList<>(numReplicas); - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { - List<String> instances = instancePartitions.getInstances(0, replicaId); - instancesAssigned.add(instances.get(partitionId % instances.size())); - } - return instancesAssigned; - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java new file mode 100644 index 0000000..ab279fb --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Segment assignment for LLC real-time table. + * <ul> + * <li> + * For the CONSUMING segments, it is very similar to replica-group based segment assignment with the following + * differences: + * <ul> + * <li> + * 1. Within a replica, all segments of the same partition (steam partition) are always assigned to exactly one + * server, and because of that we can directly assign or rebalance the CONSUMING segments to the servers based + * on the partition id + * </li> + * <li> + * 2. Partition id for an instance is derived from the index of the instance (within the replica-group for + * replica-group based assignment), instead of explicitly stored in the instance partitions + * </li> + * </ul> + * </li> + * <li> + * For the COMPLETED segments, rebalance segments the same way as OfflineSegmentAssignment. + * </li> + * </ul> + */ +public class RealtimeSegmentAssignment implements SegmentAssignment { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentAssignment.class); + + private String _realtimeTableName; + private int _replication; + + @Override + public void init(HelixManager helixManager, TableConfig tableConfig) { + _realtimeTableName = tableConfig.getTableName(); + _replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); + + LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} for table: {}", _replication, + _realtimeTableName); + } + + @Override + public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); + Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", + _realtimeTableName); + Preconditions + .checkState(instancePartitions.getNumPartitions() == 1, "Instance partitions: %s should contain 1 partition", + instancePartitions.getName()); + LOGGER.info("Assigning segment: {} with instance partitions: {} for table: {}", segmentName, instancePartitions, + _realtimeTableName); + + List<String> instancesAssigned = assignSegment(segmentName, instancePartitions); + LOGGER.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, + _realtimeTableName); + return instancesAssigned; + } + + /** + * Helper method to assign instances based on the segment partition id and instance partitions. + */ + private List<String> assignSegment(String segmentName, InstancePartitions instancePartitions) { + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + + if (instancePartitions.getNumReplicas() == 1) { + // Non-replica-group based assignment: + // Uniformly spray the partitions and replicas across the instances. + // E.g. (6 servers, 3 partitions, 4 replicas) + // "0_0": [i0, i1, i2, i3, i4, i5 ] + // p0r0 p0r1 p0r2 p1r3 p1r0 p1r1 + // p1r2 p1r3 p2r0 p2r1 p2r2 p2r3 + + List<String> instances = + SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); + int numInstances = instances.size(); + List<String> instancesAssigned = new ArrayList<>(_replication); + for (int replicaId = 0; replicaId < _replication; replicaId++) { + instancesAssigned.add(instances.get((partitionId * _replication + replicaId) % numInstances)); + } + return instancesAssigned; + } else { + // Replica-group based assignment: + // Within a replica, uniformly spray the partitions across the instances. + // E.g. (within a replica, 3 servers, 6 partitions) + // "0_0": [i0, i1, i2] + // p0 p1 p2 + // p3 p4 p5 + + int numReplicas = instancePartitions.getNumReplicas(); + if (numReplicas != _replication) { + LOGGER.warn( + "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + instancePartitions.getName(), numReplicas, _replication, _realtimeTableName, numReplicas); + } + + List<String> instancesAssigned = new ArrayList<>(numReplicas); + for (int replicaId = 0; replicaId < numReplicas; replicaId++) { + List<String> instances = instancePartitions.getInstances(0, replicaId); + instancesAssigned.add(instances.get(partitionId % instances.size())); + } + return instancesAssigned; + } + } + + @Override + public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) { + SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = + new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment); + + // Rebalance COMPLETED segments first + Map<String, Map<String, String>> completedSegmentAssignment = + completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment(); + InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); + Preconditions + .checkState(completedInstancePartitions != null, "Failed to find COMPLETED instance partitions for table: %s", + _realtimeTableName); + LOGGER.info("Rebalancing COMPLETED segments for table: {} with instance partitions: {}", _realtimeTableName, + completedInstancePartitions); + + Map<String, Map<String, String>> newAssignment; + if (completedInstancePartitions.getNumReplicas() == 1) { + // Non-replica-group based assignment + + List<String> instances = SegmentAssignmentUtils + .getInstancesForNonReplicaGroupBasedAssignment(completedInstancePartitions, _replication); + newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment, instances, _replication); + } else { + // Replica-group based assignment + + int numReplicas = completedInstancePartitions.getNumReplicas(); + if (numReplicas != _replication) { + LOGGER.warn( + "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + completedInstancePartitions.getName(), numReplicas, _replication, _realtimeTableName, numReplicas); + } + + Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); + for (String segmentName : completedSegmentAssignment.keySet()) { + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName); + } + newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(completedSegmentAssignment, completedInstancePartitions, + partitionIdToSegmentsMap); + } + + // Rebalance CONSUMING segments if configured + Map<String, Map<String, String>> consumingSegmentAssignment = + completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment(); + if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, + RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { + InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); + Preconditions + .checkState(consumingInstancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", + _realtimeTableName); + Preconditions.checkState(consumingInstancePartitions.getNumPartitions() == 1, + "Instance partitions: %s should contain 1 partition", consumingInstancePartitions.getName()); + LOGGER.info("Rebalancing CONSUMING segments for table: {} with instance partitions: {}", _realtimeTableName, + consumingInstancePartitions); + + for (String segmentName : consumingSegmentAssignment.keySet()) { + List<String> instancesAssigned = assignSegment(segmentName, consumingInstancePartitions); + Map<String, String> instanceStateMap = SegmentAssignmentUtils + .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + newAssignment.put(segmentName, instanceStateMap); + } + } else { + newAssignment.putAll(consumingSegmentAssignment); + } + + // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE + // segments and re-assign them + newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment()); + + LOGGER.info("Rebalanced table: {}, number of segments to be moved to each instance: {}", _realtimeTableName, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + return newAssignment; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java similarity index 59% rename from pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java index de2ea0d..fd613ff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java @@ -23,15 +23,21 @@ import java.util.Map; import org.apache.commons.configuration.Configuration; import org.apache.helix.HelixManager; import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; /** - * Strategy to assign segment to instances or rebalance all segments in a table. + * Interface for segment assignment and table rebalance. + * <p> + * TODO: Add SegmentAssignmentStrategy interface and support custom segment assignment strategy (e.g. cost based segment + * assignment). SegmentAssignmentStrategy should not be coupled with SegmentAssignment, and SegmentAssignment + * should be able to choose the segment assignment strategy based on the configuration. */ -public interface SegmentAssignmentStrategy { +public interface SegmentAssignment { /** - * Initializes the segment assignment strategy. + * Initializes the segment assignment. * * @param helixManager Helix manager * @param tableConfig Table config @@ -39,21 +45,24 @@ public interface SegmentAssignmentStrategy { void init(HelixManager helixManager, TableConfig tableConfig); /** - * Assigns a new segment. + * Assigns segment to instances. * * @param segmentName Name of the segment to be assigned * @param currentAssignment Current segment assignment of the table (map from segment name to instance state map) - * @return List of servers to assign the segment to + * @param instancePartitionsMap Map from type (OFFLINE|CONSUMING|COMPLETED) to instance partitions + * @return List of instances to assign the segment to */ - List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment); + List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap); /** - * Rebalances the segments for a table. + * Rebalances the segment assignment for a table. * * @param currentAssignment Current segment assignment of the table (map from segment name to instance state map) + * @param instancePartitionsMap Map from type (OFFLINE|CONSUMING|COMPLETED) to instance partitions * @param config Configuration for the rebalance - * @return the rebalanced assignment for the segments + * @return Rebalanced assignment for the segments */ Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, - Configuration config); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java new file mode 100644 index 0000000..db531bb --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; + + +/** + * Factory for the {@link SegmentAssignment}. + */ +public class SegmentAssignmentFactory { + private SegmentAssignmentFactory() { + } + + public static SegmentAssignment getSegmentAssignment(HelixManager helixManager, TableConfig tableConfig) { + SegmentAssignment segmentAssignment; + if (tableConfig.getTableType() == TableType.OFFLINE) { + segmentAssignment = new OfflineSegmentAssignment(); + } else { + segmentAssignment = new RealtimeSegmentAssignment(); + } + segmentAssignment.init(helixManager, tableConfig); + return segmentAssignment; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java deleted file mode 100644 index 875b856..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.assignment.segment; - -import org.apache.helix.HelixManager; -import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; -import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy; - - -/** - * Factory for the {@link SegmentAssignmentStrategy}. - */ -public class SegmentAssignmentStrategyFactory { - private SegmentAssignmentStrategyFactory() { - } - - public static SegmentAssignmentStrategy getSegmentAssignmentStrategy(HelixManager helixManager, - TableConfig tableConfig) { - SegmentAssignmentStrategy segmentAssignmentStrategy; - if (AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY - .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy())) { - if (tableConfig.getTableType() == TableType.OFFLINE) { - segmentAssignmentStrategy = new OfflineReplicaGroupSegmentAssignmentStrategy(); - } else { - segmentAssignmentStrategy = new RealtimeReplicaGroupSegmentAssignmentStrategy(); - } - } else { - if (tableConfig.getTableType() == TableType.OFFLINE) { - segmentAssignmentStrategy = new OfflineBalanceNumSegmentAssignmentStrategy(); - } else { - segmentAssignmentStrategy = new RealtimeBalanceNumSegmentAssignmentStrategy(); - } - } - segmentAssignmentStrategy.init(helixManager, tableConfig); - return segmentAssignmentStrategy; - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 0f3fe6f..38d9681 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -27,15 +27,11 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; -import org.apache.helix.HelixManager; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; -import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; -import org.apache.pinot.common.utils.InstancePartitionsType; import org.apache.pinot.common.utils.Pairs; import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; -import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; /** @@ -73,23 +69,23 @@ class SegmentAssignmentUtils { } /** - * Returns the instances for the balance number segment assignment strategy. + * Returns instances for non-replica-group based assignment. */ - static List<String> getInstancesForBalanceNumStrategy(HelixManager helixManager, TableConfig tableConfig, - int replication, InstancePartitionsType instancePartitionsType) { - InstancePartitions instancePartitions = - InstancePartitionsUtils.fetchOrComputeInstancePartitions(helixManager, tableConfig, instancePartitionsType); - Preconditions.checkArgument(instancePartitions.getNumPartitions() == 1 && instancePartitions.getNumReplicas() == 1, - "The instance partitions: %s should contain only 1 partition and 1 replica", instancePartitions.getName()); + static List<String> getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions instancePartitions, + int replication) { + Preconditions.checkState(instancePartitions.getNumReplicas() == 1 && instancePartitions.getNumPartitions() == 1, + "Instance partitions: %s should contain 1 replica and 1 partition for non-replica-group based assignment", + instancePartitions.getName()); List<String> instances = instancePartitions.getInstances(0, 0); - Preconditions.checkState(instances.size() >= replication, - "There are less instances: %d than the replication: %d for table: %s", instances.size(), replication, - tableConfig.getTableName()); + int numInstances = instances.size(); + Preconditions.checkState(numInstances >= replication, + "There are less instances: %s in instance partitions: %s than the table replication: %s", numInstances, + instancePartitions.getName(), replication); return instances; } /** - * Rebalances the table with Helix AutoRebalanceStrategy for the balance number segment assignment strategy. + * Rebalances the table with Helix AutoRebalanceStrategy. */ static Map<String, Map<String, String>> rebalanceTableWithHelixAutoRebalanceStrategy( Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java index 9d6e7d7..004ff1f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java @@ -283,7 +283,7 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { // Post the CONSUMING instance partitions instancePartitionsMap = deserializeInstancePartitionsMap( sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null), - JsonUtils.objectToString(consumingInstancePartitions))); + consumingInstancePartitions.toJsonString())); assertEquals(instancePartitionsMap.size(), 1); assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0), Collections.singletonList(realtimeInstanceId)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java similarity index 76% rename from pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java rename to pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java index 762773b..c455274 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java @@ -19,14 +19,11 @@ package org.apache.pinot.controller.helix.core.assignment.segment; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.InstancePartitionsType; @@ -34,16 +31,11 @@ import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class OfflineBalanceNumSegmentAssignmentStrategyTest { +public class OfflineNonReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; private static final String SEGMENT_NAME_PREFIX = "segment_"; private static final int NUM_SEGMENTS = 100; @@ -57,33 +49,26 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest { private static final String INSTANCE_PARTITIONS_NAME = InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME); - private SegmentAssignmentStrategy _strategy; + private SegmentAssignment _segmentAssignment; + private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap; @BeforeClass public void setUp() { + TableConfig tableConfig = + new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); + _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig); + // { // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9] // } InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME); instancePartitions.setInstances(0, 0, INSTANCES); - - // Mock HelixManager - @SuppressWarnings("unchecked") - ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); - when(propertyStore - .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME)), any(), - anyInt())).thenReturn(instancePartitions.toZNRecord()); - HelixManager helixManager = mock(HelixManager.class); - when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); - - TableConfig tableConfig = - new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); - _strategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, tableConfig); + _instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions); } @Test public void testFactory() { - assertTrue(_strategy instanceof OfflineBalanceNumSegmentAssignmentStrategy); + assertTrue(_segmentAssignment instanceof OfflineSegmentAssignment); } @Test @@ -98,7 +83,8 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest { // ... int expectedAssignedInstanceId = 0; for (String segmentName : SEGMENTS) { - List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = + _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); @@ -113,7 +99,8 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest { public void testTableBalanced() { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (String segmentName : SEGMENTS) { - List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = + _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); } @@ -132,6 +119,6 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals(_strategy.rebalanceTable(currentAssignment, null), currentAssignment); + assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, null), currentAssignment); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java similarity index 86% rename from pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java rename to pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java index 28586f0..089c016 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java @@ -51,7 +51,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class OfflineReplicaGroupSegmentAssignmentStrategyTest { +public class OfflineReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; private static final String SEGMENT_NAME_PREFIX = "segment_"; private static final int NUM_SEGMENTS = 90; @@ -72,11 +72,20 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { private static final String PARTITION_COLUMN = "partitionColumn"; private static final int NUM_PARTITIONS = 3; - private SegmentAssignmentStrategy _strategyWithoutPartition; - private SegmentAssignmentStrategy _strategyWithPartition; + private SegmentAssignment _segmentAssignmentWithoutPartition; + private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMapWithoutPartition; + private SegmentAssignment _segmentAssignmentWithPartition; + private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMapWithPartition; @BeforeClass public void setUp() { + TableConfig tableConfigWithoutPartitions = + new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION) + .setNumReplicas(NUM_REPLICAS) + .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build(); + _segmentAssignmentWithoutPartition = + SegmentAssignmentFactory.getSegmentAssignment(null, tableConfigWithoutPartitions); + // { // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5], // 0_1=[instance_6, instance_7, instance_8, instance_9, instance_10, instance_11], @@ -93,54 +102,12 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { } instancePartitionsWithoutPartition.setInstances(0, replicaId, instancesForReplica); } - - // Mock HelixManager - @SuppressWarnings("unchecked") - ZkHelixPropertyStore<ZNRecord> propertyStoreWithoutPartitions = mock(ZkHelixPropertyStore.class); - when(propertyStoreWithoutPartitions.get(eq(ZKMetadataProvider - .constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION)), any(), anyInt())) - .thenReturn(instancePartitionsWithoutPartition.toZNRecord()); - HelixManager helixManagerWithoutPartitions = mock(HelixManager.class); - when(helixManagerWithoutPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithoutPartitions); - - TableConfig tableConfigWithoutPartitions = - new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION) - .setNumReplicas(NUM_REPLICAS) - .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build(); - _strategyWithoutPartition = SegmentAssignmentStrategyFactory - .getSegmentAssignmentStrategy(helixManagerWithoutPartitions, tableConfigWithoutPartitions); - - // { - // 0_0=[instance_0, instance_1], - // 0_1=[instance_6, instance_7], - // 0_2=[instance_12, instance_13], - // 1_0=[instance_2, instance_3], - // 1_1=[instance_8, instance_9], - // 1_2=[instance_14, instance_15], - // 2_0=[instance_4, instance_5], - // 2_1=[instance_10, instance_11], - // 2_2=[instance_16, instance_17] - // } - InstancePartitions instancePartitionsWithPartition = - new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION); - int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS; - instanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) { - List<String> instancesForPartition = new ArrayList<>(numInstancesPerPartition); - for (int i = 0; i < numInstancesPerPartition; i++) { - instancesForPartition.add(INSTANCES.get(instanceIdToAdd++)); - } - instancePartitionsWithPartition.setInstances(partitionId, replicaId, instancesForPartition); - } - } + _instancePartitionsMapWithoutPartition = + Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitionsWithoutPartition); // Mock HelixManager @SuppressWarnings("unchecked") ZkHelixPropertyStore<ZNRecord> propertyStoreWithPartitions = mock(ZkHelixPropertyStore.class); - when(propertyStoreWithPartitions.get( - eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION)), - any(), anyInt())).thenReturn(instancePartitionsWithPartition.toZNRecord()); List<ZNRecord> segmentZKMetadataZNRecords = new ArrayList<>(NUM_SEGMENTS); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = SEGMENTS.get(segmentId); @@ -168,14 +135,41 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { .setNumReplicas(NUM_REPLICAS) .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build(); tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(strategyConfig); - _strategyWithPartition = SegmentAssignmentStrategyFactory - .getSegmentAssignmentStrategy(helixManagerWithPartitions, tableConfigWithPartitions); + _segmentAssignmentWithPartition = + SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions, tableConfigWithPartitions); + + // { + // 0_0=[instance_0, instance_1], + // 0_1=[instance_6, instance_7], + // 0_2=[instance_12, instance_13], + // 1_0=[instance_2, instance_3], + // 1_1=[instance_8, instance_9], + // 1_2=[instance_14, instance_15], + // 2_0=[instance_4, instance_5], + // 2_1=[instance_10, instance_11], + // 2_2=[instance_16, instance_17] + // } + InstancePartitions instancePartitionsWithPartition = + new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION); + int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS; + instanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) { + List<String> instancesForPartition = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instancesForPartition.add(INSTANCES.get(instanceIdToAdd++)); + } + instancePartitionsWithPartition.setInstances(partitionId, replicaId, instancesForPartition); + } + } + _instancePartitionsMapWithPartition = + Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitionsWithPartition); } @Test public void testFactory() { - assertTrue(_strategyWithoutPartition instanceof OfflineReplicaGroupSegmentAssignmentStrategy); - assertTrue(_strategyWithPartition instanceof OfflineReplicaGroupSegmentAssignmentStrategy); + assertTrue(_segmentAssignmentWithoutPartition instanceof OfflineSegmentAssignment); + assertTrue(_segmentAssignmentWithPartition instanceof OfflineSegmentAssignment); } @Test @@ -184,7 +178,8 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = SEGMENTS.get(segmentId); - List<String> instancesAssigned = _strategyWithoutPartition.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = _segmentAssignmentWithoutPartition + .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition); assertEquals(instancesAssigned.size(), NUM_REPLICAS); for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { @@ -212,7 +207,8 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { int numInstancesPerPartition = numInstancesPerReplica / NUM_PARTITIONS; for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = SEGMENTS.get(segmentId); - List<String> instancesAssigned = _strategyWithPartition.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = _segmentAssignmentWithPartition + .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition); assertEquals(instancesAssigned.size(), NUM_REPLICAS); int partitionId = segmentId % NUM_PARTITIONS; for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { @@ -240,7 +236,8 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { public void testTableBalancedWithoutPartition() { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (String segmentName : SEGMENTS) { - List<String> instancesAssigned = _strategyWithoutPartition.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = _segmentAssignmentWithoutPartition + .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition); currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); } @@ -259,14 +256,16 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals(_strategyWithoutPartition.rebalanceTable(currentAssignment, null), currentAssignment); + assertEquals(_segmentAssignmentWithoutPartition + .rebalanceTable(currentAssignment, _instancePartitionsMapWithoutPartition, null), currentAssignment); } @Test public void testTableBalancedWithPartition() { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (String segmentName : SEGMENTS) { - List<String> instancesAssigned = _strategyWithPartition.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = _segmentAssignmentWithPartition + .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition); currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); } @@ -285,6 +284,8 @@ public class OfflineReplicaGroupSegmentAssignmentStrategyTest { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals(_strategyWithPartition.rebalanceTable(currentAssignment, null), currentAssignment); + assertEquals( + _segmentAssignmentWithPartition.rebalanceTable(currentAssignment, _instancePartitionsMapWithPartition, null), + currentAssignment); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java similarity index 80% rename from pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java rename to pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 0aa454c..5fee01a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -23,11 +23,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; -import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.InstancePartitionsType; @@ -37,16 +33,11 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConst import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class RealtimeBalanceNumSegmentAssignmentStrategyTest { +public class RealtimeNonReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; private static final int NUM_PARTITIONS = 4; private static final int NUM_SEGMENTS = 100; @@ -65,7 +56,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME); private List<String> _segments; - private SegmentAssignmentStrategy _strategy; + private SegmentAssignment _segmentAssignment; + private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap; @BeforeClass public void setUp() { @@ -75,7 +67,13 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { System.currentTimeMillis()).getSegmentName()); } - // Consuming instances: + TableConfig tableConfig = + new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS).setLLC(true).build(); + _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig); + + _instancePartitionsMap = new TreeMap<>(); + // CONSUMING instances: // { // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8] // } @@ -83,35 +81,20 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { // p3r0 p3r1 p3r2 InstancePartitions consumingInstancePartitions = new InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME); consumingInstancePartitions.setInstances(0, 0, CONSUMING_INSTANCES); + _instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions); - // Completed instances: + // COMPLETED instances: // { // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9] // } InstancePartitions completedInstancePartitions = new InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME); completedInstancePartitions.setInstances(0, 0, COMPLETED_INSTANCES); - - // Mock HelixManager - @SuppressWarnings("unchecked") - ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); - when(propertyStore - .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)), - any(), anyInt())).thenReturn(consumingInstancePartitions.toZNRecord()); - when(propertyStore - .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)), - any(), anyInt())).thenReturn(completedInstancePartitions.toZNRecord()); - HelixManager helixManager = mock(HelixManager.class); - when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); - - TableConfig tableConfig = - new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME) - .setNumReplicas(NUM_REPLICAS).setLLC(true).build(); - _strategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, tableConfig); + _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions); } @Test public void testFactory() { - assertTrue(_strategy instanceof RealtimeBalanceNumSegmentAssignmentStrategy); + assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment); } @Test @@ -119,7 +102,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); - List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = + _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { @@ -143,7 +127,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); - List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = + _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); addToAssignment(currentAssignment, segmentId, instancesAssigned); } @@ -154,20 +139,20 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { assertEquals(instanceStateMap.size(), NUM_REPLICAS); } - // Rebalance should relocate all completed (ONLINE) segments to the completed instances + // Rebalance should relocate all COMPLETED (ONLINE) segments to the COMPLETED instances Map<String, Map<String, String>> newAssignment = - _strategy.rebalanceTable(currentAssignment, new BaseConfiguration()); + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, new BaseConfiguration()); assertEquals(newAssignment.size(), NUM_SEGMENTS); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { - // Completed (ONLINE) segments + // COMPLETED (ONLINE) segments Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); } } else { - // Consuming (CONSUMING) segments + // CONSUMING segments Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); @@ -184,10 +169,10 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance); } - // Rebalance all segments (both completed and consuming) should give the same assignment + // Rebalance all segments (including CONSUMING) should give the same assignment BaseConfiguration config = new BaseConfiguration(); config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true); - assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, config), newAssignment); // Rebalance should not change the assignment for the OFFLINE segments String offlineSegmentName = "offlineSegment"; @@ -196,7 +181,7 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { RealtimeSegmentOnlineOfflineStateModel.OFFLINE); currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); - assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, config), newAssignment); } private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java similarity index 82% rename from pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java rename to pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index f74a7b2..04fc344 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -24,11 +24,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; -import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy; @@ -39,16 +35,11 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConst import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { +public class RealtimeReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; private static final int NUM_PARTITIONS = 4; private static final int NUM_SEGMENTS = 100; @@ -67,7 +58,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME); private List<String> _segments; - private SegmentAssignmentStrategy _strategy; + private SegmentAssignment _segmentAssignment; + private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap; @BeforeClass public void setUp() { @@ -77,7 +69,14 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { System.currentTimeMillis()).getSegmentName()); } - // Consuming instances: + TableConfig tableConfig = + new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) + .setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) + .build(); + _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig); + + _instancePartitionsMap = new TreeMap<>(); + // CONSUMING instances: // { // 0_0=[instance_0, instance_1, instance_2], // 0_1=[instance_3, instance_4, instance_5], @@ -95,8 +94,9 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { } consumingInstancePartitions.setInstances(0, replicaId, consumingInstancesForReplica); } + _instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions); - // Completed instances: + // COMPLETED instances: // { // 0_0=[instance_0, instance_1, instance_2, instance_3], // 0_1=[instance_4, instance_5, instance_6, instance_7], @@ -112,29 +112,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { } completedInstancePartitions.setInstances(0, replicaId, completedInstancesForReplica); } - - // Mock HelixManager - @SuppressWarnings("unchecked") - ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); - when(propertyStore - .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)), - any(), anyInt())).thenReturn(consumingInstancePartitions.toZNRecord()); - when(propertyStore - .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)), - any(), anyInt())).thenReturn(completedInstancePartitions.toZNRecord()); - HelixManager helixManager = mock(HelixManager.class); - when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); - - TableConfig tableConfig = - new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) - .build(); - _strategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, tableConfig); + _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions); } @Test public void testFactory() { - assertTrue(_strategy instanceof RealtimeReplicaGroupSegmentAssignmentStrategy); + assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment); } @Test @@ -143,7 +126,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); - List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = + _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { @@ -167,7 +151,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); - List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + List<String> instancesAssigned = + _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); addToAssignment(currentAssignment, segmentId, instancesAssigned); } @@ -178,20 +163,20 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { assertEquals(instanceStateMap.size(), NUM_REPLICAS); } - // Rebalance should relocate all completed (ONLINE) segments to the completed instances + // Rebalance should relocate all COMPLETED (ONLINE) segments to the COMPLETED instances Map<String, Map<String, String>> newAssignment = - _strategy.rebalanceTable(currentAssignment, new BaseConfiguration()); + _segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, new BaseConfiguration()); assertEquals(newAssignment.size(), NUM_SEGMENTS); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { - // Completed (ONLINE) segments + // COMPLETED (ONLINE) segments Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); } } else { - // Consuming (CONSUMING) segments + // CONSUMING segments Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); @@ -207,10 +192,10 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); - // Rebalance all segments (both completed and consuming) should give the same assignment + // Rebalance all segments (including CONSUMING) should give the same assignment BaseConfiguration config = new BaseConfiguration(); config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true); - assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, config), newAssignment); // Rebalance should not change the assignment for the OFFLINE segments String offlineSegmentName = "offlineSegment"; @@ -219,7 +204,7 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { RealtimeSegmentOnlineOfflineStateModel.OFFLINE); currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); - assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, _instancePartitionsMap, config), newAssignment); } private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
