This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0d69302 [Instance Assignment] De-couple assignment strategy from
SegmentAssignment (#4533)
0d69302 is described below
commit 0d69302007bc5c452c0c9dd9b2389a4582cb84bd
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Aug 19 14:15:21 2019 -0700
[Instance Assignment] De-couple assignment strategy from SegmentAssignment
(#4533)
For REALTIME segments, we should be able to assign CONSUMING and
COMPLETED segments with different strategies, i.e. one could
follows replica-group while the other follows balance-number. In
order to do so, we need to de-couple the assignment strategy from
the segment assignment, so each assignment can use different
strategies if necessary.
- Change SegmentAssignmentStrategy interface to SegmentAssignment
- Add OfflineSegmentAssignment and RealtimeSegmentAssignment
- Use InstancePartitions to determine the strategy to use
- For REALTIME table, support different strategies for CONSUMING
and COMPLETED segments
---
.../helix/core/assignment/InstancePartitions.java | 17 ++
...OfflineBalanceNumSegmentAssignmentStrategy.java | 96 --------
...flineReplicaGroupSegmentAssignmentStrategy.java | 200 ----------------
.../segment/OfflineSegmentAssignment.java | 260 +++++++++++++++++++++
...ealtimeBalanceNumSegmentAssignmentStrategy.java | 161 -------------
...ltimeReplicaGroupSegmentAssignmentStrategy.java | 165 -------------
.../segment/RealtimeSegmentAssignment.java | 218 +++++++++++++++++
...ignmentStrategy.java => SegmentAssignment.java} | 27 ++-
.../segment/SegmentAssignmentFactory.java | 43 ++++
.../segment/SegmentAssignmentStrategyFactory.java | 54 -----
.../assignment/segment/SegmentAssignmentUtils.java | 26 +--
...PinotInstanceAssignmentRestletResourceTest.java | 2 +-
...flineNonReplicaGroupSegmentAssignmentTest.java} | 43 ++--
... OfflineReplicaGroupSegmentAssignmentTest.java} | 115 ++++-----
...ltimeNonReplicaGroupSegmentAssignmentTest.java} | 65 ++----
...RealtimeReplicaGroupSegmentAssignmentTest.java} | 67 +++---
16 files changed, 692 insertions(+), 867 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
index 4ed659e..4060f4b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
@@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.utils.JsonUtils;
/**
@@ -64,10 +66,12 @@ public class InstancePartitions {
}
}
+ @JsonProperty
public String getName() {
return _name;
}
+ @JsonProperty
public Map<String, List<String>> getPartitionToInstancesMap() {
return _partitionToInstancesMap;
}
@@ -102,4 +106,17 @@ public class InstancePartitions {
znRecord.setListFields(_partitionToInstancesMap);
return znRecord;
}
+
+ public String toJsonString() {
+ try {
+ return JsonUtils.objectToString(this);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJsonString();
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
deleted file mode 100644
index 4bd34b5..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.common.utils.Pairs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for offline segments that assigns segment to
the instance with the least number of
- * segments. In case of a tie, assigns to the instance with the smallest index
in the list. The strategy ensures that
- * replicas of the same segment are not assigned to the same server.
- * <p>To rebalance a table, use Helix AutoRebalanceStrategy.
- */
-public class OfflineBalanceNumSegmentAssignmentStrategy implements
SegmentAssignmentStrategy {
- private static final Logger LOGGER =
LoggerFactory.getLogger(OfflineBalanceNumSegmentAssignmentStrategy.class);
-
- private HelixManager _helixManager;
- private TableConfig _tableConfig;
- private String _tableNameWithType;
- private int _replication;
-
- @Override
- public void init(HelixManager helixManager, TableConfig tableConfig) {
- _helixManager = helixManager;
- _tableConfig = tableConfig;
- _tableNameWithType = tableConfig.getTableName();
- _replication = tableConfig.getValidationConfig().getReplicationNumber();
-
- LOGGER.info("Initialized OfflineBalanceNumSegmentAssignmentStrategy for
table: {} with replication: {}",
- _tableNameWithType, _replication);
- }
-
- @Override
- public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment) {
- List<String> instances = SegmentAssignmentUtils
- .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig,
_replication, InstancePartitionsType.OFFLINE);
- int[] numSegmentsAssignedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment,
instances);
-
- // Assign the segment to the instance with the least segments, or the
smallest id if there is a tie
- int numInstances = numSegmentsAssignedPerInstance.length;
- PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances,
Pairs.intPairComparator());
- for (int instanceId = 0; instanceId < numInstances; instanceId++) {
- heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId],
instanceId));
- }
- List<String> instancesAssigned = new ArrayList<>(_replication);
- for (int i = 0; i < _replication; i++) {
- instancesAssigned.add(instances.get(heap.remove().getRight()));
- }
-
- LOGGER.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
- _tableNameWithType);
- return instancesAssigned;
- }
-
- @Override
- public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
- Configuration config) {
- List<String> instances = SegmentAssignmentUtils
- .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig,
_replication, InstancePartitionsType.OFFLINE);
- Map<String, Map<String, String>> newAssignment =
-
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
instances, _replication);
-
- LOGGER.info(
- "Rebalanced {} segments to instances: {} for table: {} with
replication: {}, number of segments to be moved to each instance: {}",
- currentAssignment.size(), instances, _tableNameWithType, _replication,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
- return newAssignment;
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
deleted file mode 100644
index 8124d30..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
-import
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for offline segments that assigns segment to
the instance in the replica with the least
- * number of segments.
- * <p>Among multiple replicas, always mirror the assignment (pick the same
index of the instance).
- * <p>Inside each partition, assign the segment to the servers with the least
segments already assigned. In case of a
- * tie, assign to the server with the smallest index in the list. Do this for
one replica and mirror the assignment to
- * other replicas.
- * <p>To rebalance a table, inside each partition, first calculate the number
of segments on each server, loop over all
- * the segments and keep the assignment if number of segments for the server
has not been reached and track the not
- * assigned segments, then assign the left-over segments to the servers with
the least segments, or the smallest index
- * if there is a tie. Repeat the process for all the partitions in one
replica, and mirror the assignment to other
- * replicas. With this greedy algorithm, the result is deterministic and with
minimum segment moves.
- */
-public class OfflineReplicaGroupSegmentAssignmentStrategy implements
SegmentAssignmentStrategy {
- private static final Logger LOGGER =
LoggerFactory.getLogger(OfflineReplicaGroupSegmentAssignmentStrategy.class);
-
- private HelixManager _helixManager;
- private TableConfig _tableConfig;
- private String _tableNameWithType;
- private String _partitionColumn;
-
- @Override
- public void init(HelixManager helixManager, TableConfig tableConfig) {
- _helixManager = helixManager;
- _tableConfig = tableConfig;
- _tableNameWithType = tableConfig.getTableName();
- ReplicaGroupStrategyConfig strategyConfig =
tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
- _partitionColumn = strategyConfig != null ?
strategyConfig.getPartitionColumn() : null;
-
- if (_partitionColumn == null) {
- LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy
for table: {} without partition column",
- _tableNameWithType);
- } else {
- LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy
for table: {} with partition column: {}",
- _tableNameWithType, _partitionColumn);
- }
- }
-
- @Override
- public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment) {
- InstancePartitions instancePartitions = InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixManager, _tableConfig,
InstancePartitionsType.OFFLINE);
-
- // Fetch partition id from segment ZK metadata if partition column is
configured
- int partitionId = 0;
- if (_partitionColumn == null) {
- Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
- "The instance partitions: %s should contain only 1 partition",
instancePartitions.getName());
- } else {
- OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
- .getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType, segmentName);
- Preconditions
- .checkState(segmentZKMetadata != null, "Failed to fetch segment ZK
metadata for table: %s, segment: %s",
- _tableNameWithType, segmentName);
- // Uniformly spray the segment partitions over the instance partitions
- partitionId = getPartitionId(segmentZKMetadata) %
instancePartitions.getNumPartitions();
- }
-
- // First assign the segment to replica 0
- List<String> instances = instancePartitions.getInstances(partitionId, 0);
- int[] numSegmentsAssignedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment,
instances);
- int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0];
- int instanceIdWithLeastSegmentsAssigned = 0;
- int numInstances = numSegmentsAssignedPerInstance.length;
- for (int instanceId = 1; instanceId < numInstances; instanceId++) {
- if (numSegmentsAssignedPerInstance[instanceId] < minNumSegmentsAssigned)
{
- minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId];
- instanceIdWithLeastSegmentsAssigned = instanceId;
- }
- }
-
- // Mirror the assignment to all replicas
- int numReplicas = instancePartitions.getNumReplicas();
- List<String> instancesAssigned = new ArrayList<>(numReplicas);
- for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
- instancesAssigned
- .add(instancePartitions.getInstances(partitionId,
replicaId).get(instanceIdWithLeastSegmentsAssigned));
- }
-
- if (_partitionColumn == null) {
- LOGGER.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
- _tableNameWithType);
- } else {
- LOGGER.info("Assigned segment: {} with partition id: {} to instances: {}
for table: {}", segmentName, partitionId,
- instancesAssigned, _tableNameWithType);
- }
- return instancesAssigned;
- }
-
- @Override
- public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
- Configuration config) {
- InstancePartitions instancePartitions = InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixManager, _tableConfig,
InstancePartitionsType.OFFLINE);
- if (_partitionColumn == null) {
- return rebalanceTableWithoutPartition(currentAssignment,
instancePartitions);
- } else {
- return rebalanceTableWithPartition(currentAssignment,
instancePartitions);
- }
- }
-
- private Map<String, Map<String, String>> rebalanceTableWithoutPartition(
- Map<String, Map<String, String>> currentAssignment, InstancePartitions
instancePartitions) {
- Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
- "The instance partitions: %s should contain only 1 partition",
instancePartitions.getName());
-
- Map<String, Map<String, String>> newAssignment = new TreeMap<>();
- SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedPartition(currentAssignment,
instancePartitions, 0, currentAssignment.keySet(),
- newAssignment);
-
- LOGGER.info(
- "Rebalanced {} segments with instance partitions: {} for table: {}
without partition column, number of segments to be moved to each instance: {}",
- currentAssignment.size(),
instancePartitions.getPartitionToInstancesMap(), _tableNameWithType,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
- return newAssignment;
- }
-
- private Map<String, Map<String, String>> rebalanceTableWithPartition(
- Map<String, Map<String, String>> currentAssignment, InstancePartitions
instancePartitions) {
- // Fetch partition id from segment ZK metadata
- List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider
-
.getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(),
_tableNameWithType);
- Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new
HashMap<>();
- for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
- segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(),
segmentZKMetadata);
- }
- Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
- for (String segmentName : currentAssignment.keySet()) {
- int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
- partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new
HashSet<>()).add(segmentName);
- }
-
- Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions, partitionIdToSegmentsMap);
-
- LOGGER.info(
- "Rebalanced {} segments with instance partitions: {} for table: {}
with partition column: {}, number of segments to be moved to each instance: {}",
- currentAssignment.size(),
instancePartitions.getPartitionToInstancesMap(), _tableNameWithType,
_partitionColumn,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
- return newAssignment;
- }
-
- private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) {
- String segmentName = segmentZKMetadata.getSegmentName();
- ColumnPartitionMetadata partitionMetadata =
-
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn);
- Preconditions.checkState(partitionMetadata != null,
- "Segment ZK metadata for table: %s, segment: %s does not contain
partition metadata for column: %s",
- _tableNameWithType, segmentName, _partitionColumn);
- Set<Integer> partitions = partitionMetadata.getPartitions();
- Preconditions.checkState(partitions.size() == 1,
- "Segment ZK metadata for table: %s, segment: %s contains multiple
partitions for column: %s",
- _tableNameWithType, segmentName, _partitionColumn);
- return partitions.iterator().next();
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
new file mode 100644
index 0000000..5dfb034
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Segment assignment for offline table.
+ * <ul>
+ * <li>
+ * Non-replica-group based assignment (only 1 replica in instance
partitions):
+ * <p>Assign the segment to the instance with the least number of
segments. In case of a tie, assign the segment to
+ * the instance with the smallest index in the list. Use Helix
AutoRebalanceStrategy to rebalance the table.
+ * </li>
+ * <li>
+ * Replica-group based assignment (more than 1 replicas in instance
partitions):
+ * <p>Among replicas, always mirror the assignment (pick the same index of
the instance).
+ * <p>Within each partition, assign the segment to the servers with the
least segments already assigned. In case of
+ * a tie, assign to the server with the smallest index in the list. Do
this for one replica and mirror the
+ * assignment to other replicas.
+ * <p>To rebalance a table, within each partition, first calculate the
number of segments on each server, loop over
+ * all the segments and keep the assignment if number of segments for the
server has not been reached and track the
+ * not assigned segments, then assign the left-over segments to the
servers with the least segments, or the smallest
+ * index if there is a tie. Repeat the process for all the partitions in
one replica, and mirror the assignment to
+ * other replicas. With this greedy algorithm, the result is deterministic
and with minimum segment moves.
+ * </li>
+ * </ul>
+ */
+public class OfflineSegmentAssignment implements SegmentAssignment {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OfflineSegmentAssignment.class);
+
+ private HelixManager _helixManager;
+ private String _offlineTableName;
+ private int _replication;
+ private String _partitionColumn;
+
+ @Override
+ public void init(HelixManager helixManager, TableConfig tableConfig) {
+ _helixManager = helixManager;
+ _offlineTableName = tableConfig.getTableName();
+ _replication = tableConfig.getValidationConfig().getReplicationNumber();
+ ReplicaGroupStrategyConfig strategyConfig =
tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
+ _partitionColumn = strategyConfig != null ?
strategyConfig.getPartitionColumn() : null;
+
+ if (_partitionColumn == null) {
+ LOGGER.info("Initialized OfflineSegmentAssignment with replication: {}
without partition column for table: {} ",
+ _replication, _offlineTableName);
+ } else {
+ LOGGER.info("Initialized OfflineSegmentAssignment with replication: {}
and partition column: {} for table: {}",
+ _replication, _partitionColumn, _offlineTableName);
+ }
+ }
+
+ @Override
+ public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
OFFLINE instance partitions for table: %s",
+ _offlineTableName);
+ LOGGER.info("Assigning segment: {} with instance partitions: {} for table:
{}", segmentName, instancePartitions,
+ _offlineTableName);
+
+ List<String> instancesAssigned;
+ if (instancePartitions.getNumReplicas() == 1) {
+ // Non-replica-group based assignment
+
+ // Assign the segment to the instance with the least segments, or the
smallest id if there is a tie
+ List<String> instances =
+
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
_replication);
+ int[] numSegmentsAssignedPerInstance =
+
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment,
instances);
+ int numInstances = numSegmentsAssignedPerInstance.length;
+ PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances,
Pairs.intPairComparator());
+ for (int instanceId = 0; instanceId < numInstances; instanceId++) {
+ heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId],
instanceId));
+ }
+ instancesAssigned = new ArrayList<>(_replication);
+ for (int i = 0; i < _replication; i++) {
+ instancesAssigned.add(instances.get(heap.remove().getRight()));
+ }
+ } else {
+ // Replica-group based assignment
+
+ int numReplicas = instancePartitions.getNumReplicas();
+ if (numReplicas != _replication) {
+ LOGGER.warn(
+ "Number of replicas in instance partitions {}: {} does not match
replication in table config: {} for table: {}, use: {}",
+ instancePartitions.getName(), numReplicas, _replication,
_offlineTableName, numReplicas);
+ }
+
+ // Fetch partition id from segment ZK metadata if partition column is
configured
+ int partitionId;
+ if (_partitionColumn == null) {
+ LOGGER.info("Assigning segment: {} without partition column for table:
{}", segmentName, _offlineTableName);
+
+ Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
+ "Instance partitions: %s should contain 1 partition without
partition column",
+ instancePartitions.getName());
+ partitionId = 0;
+ } else {
+ LOGGER.info("Assigning segment: {} with partition column: {} for
table: {}", segmentName, _partitionColumn,
+ _offlineTableName);
+
+ OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+
.getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_offlineTableName, segmentName);
+ Preconditions
+ .checkState(segmentZKMetadata != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
+ segmentName, _offlineTableName);
+ int segmentPartitionId = getPartitionId(segmentZKMetadata);
+
+ // Uniformly spray the segment partitions over the instance partitions
+ int numPartitions = instancePartitions.getNumPartitions();
+ partitionId = segmentPartitionId % numPartitions;
+ LOGGER.info("Assigning segment: {} with partition id: {} to partition:
{}/{} for table: {}", segmentName,
+ segmentPartitionId, partitionId, numPartitions, _offlineTableName);
+ }
+
+ // First assign the segment to replica 0
+ List<String> instances = instancePartitions.getInstances(partitionId, 0);
+ int[] numSegmentsAssignedPerInstance =
+
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment,
instances);
+ int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0];
+ int instanceIdWithLeastSegmentsAssigned = 0;
+ int numInstances = numSegmentsAssignedPerInstance.length;
+ for (int instanceId = 1; instanceId < numInstances; instanceId++) {
+ if (numSegmentsAssignedPerInstance[instanceId] <
minNumSegmentsAssigned) {
+ minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId];
+ instanceIdWithLeastSegmentsAssigned = instanceId;
+ }
+ }
+
+ // Mirror the assignment to all replicas
+ instancesAssigned = new ArrayList<>(numReplicas);
+ for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
+ instancesAssigned
+ .add(instancePartitions.getInstances(partitionId,
replicaId).get(instanceIdWithLeastSegmentsAssigned));
+ }
+ }
+
+ LOGGER
+ .info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned, _offlineTableName);
+ return instancesAssigned;
+ }
+
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
Configuration config) {
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
OFFLINE instance partitions for table: %s",
+ _offlineTableName);
+ LOGGER.info("Rebalancing table: {} with instance partitions: {}",
_offlineTableName, instancePartitions);
+
+ Map<String, Map<String, String>> newAssignment;
+ if (instancePartitions.getNumReplicas() == 1) {
+ // Non-replica-group based assignment
+
+ List<String> instances =
+
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
_replication);
+ newAssignment = SegmentAssignmentUtils
+ .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
instances, _replication);
+ } else {
+ // Replica-group based assignment
+
+ int numReplicas = instancePartitions.getNumReplicas();
+ if (numReplicas != _replication) {
+ LOGGER.warn(
+ "Number of replicas in instance partitions {}: {} does not match
replication in table config: {} for table: {}, use: {}",
+ instancePartitions.getName(), numReplicas, _replication,
_offlineTableName, numReplicas);
+ }
+
+ if (_partitionColumn == null) {
+ LOGGER.info("Rebalancing table: {} without partition column",
_offlineTableName);
+ Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
+ "Instance partitions: %s should contain 1 partition without
partition column",
+ instancePartitions.getName());
+ newAssignment = new TreeMap<>();
+ SegmentAssignmentUtils
+ .rebalanceReplicaGroupBasedPartition(currentAssignment,
instancePartitions, 0, currentAssignment.keySet(),
+ newAssignment);
+ } else {
+ LOGGER.info("Rebalancing table: {} with partition column: {}",
_offlineTableName, _partitionColumn);
+ newAssignment = rebalanceTableWithPartition(currentAssignment,
instancePartitions);
+ }
+ }
+
+ LOGGER.info("Rebalanced table: {}, number of segments to be moved to each
instance: {}", _offlineTableName,
+
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
+ return newAssignment;
+ }
+
+ private Map<String, Map<String, String>> rebalanceTableWithPartition(
+ Map<String, Map<String, String>> currentAssignment, InstancePartitions
instancePartitions) {
+ // Fetch partition id from segment ZK metadata
+ List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider
+
.getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(),
_offlineTableName);
+ Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new
HashMap<>();
+ for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+ segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(),
segmentZKMetadata);
+ }
+ Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
+ for (String segmentName : currentAssignment.keySet()) {
+ int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
+ partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new
HashSet<>()).add(segmentName);
+ }
+
+ return SegmentAssignmentUtils
+ .rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions, partitionIdToSegmentsMap);
+ }
+
+ private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ ColumnPartitionMetadata partitionMetadata =
+
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn);
+ Preconditions.checkState(partitionMetadata != null,
+ "Segment ZK metadata for segment: %s of table: %s does not contain
partition metadata for column: %s",
+ segmentName, _offlineTableName, _partitionColumn);
+ Set<Integer> partitions = partitionMetadata.getPartitions();
+ Preconditions.checkState(partitions.size() == 1,
+ "Segment ZK metadata for segment: %s of table: %s contains multiple
partitions for column: %s", segmentName,
+ _offlineTableName, _partitionColumn);
+ return partitions.iterator().next();
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
deleted file mode 100644
index 3319ad7..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for LLC real-time segments without
replica-group.
- * <ul>
- * <li>
- * For the CONSUMING segments, it is very similar to replica-group based
segment assignment with the following
- * differences:
- * <ul>
- * <li>
- * 1. Within a replica, all segments of a partition (steam partition)
always exist in exactly one one server
- * </li>
- * <li>
- * 2. Partition id for an instance is derived from the index of the
instance, instead of explicitly stored in
- * the instance partitions
- * </li>
- * <li>
- * 3. In addition to the ONLINE segments, there are also CONSUMING
segments to be assigned
- * </li>
- * </ul>
- * Since within a replica, each partition contains only one server, we can
directly assign or rebalance the
- * CONSUMING segments to the servers based on the partition id.
- * <p>The strategy does not minimize segment movements for CONSUMING
segments because within a replica, the server
- * is fixed for each partition. The instance assignment is responsible for
keeping minimum changes to the instance
- * partitions to reduce the number of segments need to be moved.
- * </li>
- * <li>
- * For the COMPLETED segments, rebalance segments the same way as
OfflineBalanceNumSegmentAssignmentStrategy.
- * </li>
- * </ul>
- */
-public class RealtimeBalanceNumSegmentAssignmentStrategy implements
SegmentAssignmentStrategy {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeBalanceNumSegmentAssignmentStrategy.class);
-
- private HelixManager _helixManager;
- private TableConfig _tableConfig;
- private String _tableNameWithType;
- private int _replication;
-
- @Override
- public void init(HelixManager helixManager, TableConfig tableConfig) {
- _helixManager = helixManager;
- _tableConfig = tableConfig;
- _tableNameWithType = tableConfig.getTableName();
- _replication =
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-
- LOGGER.info("Initialized RealtimeBalanceNumSegmentAssignmentStrategy for
table: {} with replication: {}",
- _tableNameWithType, _replication);
- }
-
- @Override
- public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment) {
- List<String> instances = SegmentAssignmentUtils
- .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig,
_replication, InstancePartitionsType.CONSUMING);
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
- List<String> instancesAssigned = getInstances(instances, partitionId);
- LOGGER.info("Assigned segment: {} with partition id: {} to instances: {}
for table: {}", segmentName, partitionId,
- instancesAssigned, _tableNameWithType);
- return instancesAssigned;
- }
-
- @Override
- public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
- Configuration config) {
- SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment
completedConsumingOfflineSegmentAssignment =
- new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
-
- // Rebalance COMPLETED segments first
- Map<String, Map<String, String>> completedSegmentAssignment =
-
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
- List<String> instancesForCompletedSegments = SegmentAssignmentUtils
- .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig,
_replication, InstancePartitionsType.COMPLETED);
- Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
-
.rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment,
instancesForCompletedSegments,
- _replication);
-
- // Rebalance CONSUMING segments if needed
- Map<String, Map<String, String>> consumingSegmentAssignment =
-
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
- if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
- RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
- List<String> instancesForConsumingSegments = SegmentAssignmentUtils
- .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig,
_replication,
- InstancePartitionsType.CONSUMING);
- for (String segmentName : consumingSegmentAssignment.keySet()) {
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
- List<String> instancesAssigned =
getInstances(instancesForConsumingSegments, partitionId);
- Map<String, String> instanceStateMap = SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
- newAssignment.put(segmentName, instanceStateMap);
- }
- LOGGER.info(
- "Rebalanced {} COMPLETED segments to instances: {} and {} CONSUMING
segments to instances: {} for table: {} with replication: {}, number of
segments to be moved to each instances: {}",
- completedSegmentAssignment.size(), instancesForCompletedSegments,
consumingSegmentAssignment.size(),
- instancesForConsumingSegments, _tableNameWithType, _replication,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
- } else {
- LOGGER.info(
- "Rebalanced {} COMPLETED segments to instances: {} for table: {}
with replication: {}, number of segments to be moved to each instance: {}",
- completedSegmentAssignment.size(), instancesForCompletedSegments,
_tableNameWithType, _replication,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment,
newAssignment));
- newAssignment.putAll(consumingSegmentAssignment);
- }
-
- // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the OFFLINE
- // segments and re-assign them
-
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
-
- return newAssignment;
- }
-
- /**
- * Returns the instances for the given partition id for CONSUMING segments.
- * <p>Uniformly spray the partitions and replicas across the instances.
- * <p>E.g. (6 servers, 3 partitions, 4 replicas)
- * <pre>
- * "0_0": [i0, i1, i2, i3, i4, i5 ]
- * p0r0, p0r1, p0r2, p1r3, p1r0, p1r1
- * p1r2, p1r3, p2r0, p2r1, p2r2, p2r3
- * </pre>
- */
- private List<String> getInstances(List<String> instances, int partitionId) {
- List<String> instancesAssigned = new ArrayList<>(_replication);
- for (int replicaId = 0; replicaId < _replication; replicaId++) {
- instancesAssigned.add(instances.get((partitionId * _replication +
replicaId) % instances.size()));
- }
- return instancesAssigned;
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
deleted file mode 100644
index 05febc8..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
-import
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
-import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for LLC real-time segments (both consuming and
completed).
- * <p>It is very similar to replica-group based segment assignment with the
following differences:
- * <ul>
- * <li>1. Inside one replica, each partition (stream partition) always
contains one server</li>
- * <li>
- * 2. Partition id for an instance is derived from the index of the
instance in the replica group, instead of
- * explicitly stored in the instance partitions
- * </li>
- * <li>3. In addition to the ONLINE segments, there are also CONSUMING
segments to be assigned</li>
- * </ul>
- * <p>
- * Since each partition contains only one server (in one replica), we can
directly assign or rebalance segments to the
- * servers based on the partition id.
- * <p>
- * The real-time segment assignment does not minimize segment moves because
the server is fixed for each partition in
- * each replica. The instance assignment is responsible for keeping minimum
changes to the instance partitions to
- * reduce the number of segments need to be moved.
- */
-public class RealtimeReplicaGroupSegmentAssignmentStrategy implements
SegmentAssignmentStrategy {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeReplicaGroupSegmentAssignmentStrategy.class);
-
- private HelixManager _helixManager;
- private TableConfig _tableConfig;
- private String _tableNameWithType;
-
- @Override
- public void init(HelixManager helixManager, TableConfig tableConfig) {
- _helixManager = helixManager;
- _tableConfig = tableConfig;
- _tableNameWithType = tableConfig.getTableName();
-
- LOGGER.info("Initialized RealtimeReplicaGroupSegmentAssignmentStrategy for
table: {}", _tableNameWithType);
- }
-
- @Override
- public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment) {
- InstancePartitions instancePartitions = InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixManager, _tableConfig,
InstancePartitionsType.CONSUMING);
- Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
- "The instance partitions: %s should contain only 1 partition",
instancePartitions.getName());
-
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
- List<String> instancesAssigned = getInstances(instancePartitions,
partitionId);
- LOGGER.info("Assigned segment: {} with partition id: {} to instances: {}
for table: {}", segmentName, partitionId,
- instancesAssigned, _tableNameWithType);
- return instancesAssigned;
- }
-
- @Override
- public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
- Configuration config) {
- SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment
completedConsumingOfflineSegmentAssignment =
- new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
-
- // Rebalance COMPLETED segments first
- Map<String, Map<String, String>> completedSegmentAssignment =
-
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
- InstancePartitions instancePartitionsForCompletedSegments =
InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixManager, _tableConfig,
InstancePartitionsType.COMPLETED);
- Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
- for (String segmentName : completedSegmentAssignment.keySet()) {
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
- partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new
HashSet<>()).add(segmentName);
- }
- Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(completedSegmentAssignment,
instancePartitionsForCompletedSegments,
- partitionIdToSegmentsMap);
-
- // Rebalance CONSUMING segments if needed
- Map<String, Map<String, String>> consumingSegmentAssignment =
-
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
- if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
- RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
- InstancePartitions instancePartitionsForConsumingSegments =
InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixManager, _tableConfig,
InstancePartitionsType.CONSUMING);
- for (String segmentName : consumingSegmentAssignment.keySet()) {
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
- List<String> instancesAssigned =
getInstances(instancePartitionsForConsumingSegments, partitionId);
- Map<String, String> instanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
-
CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
- newAssignment.put(segmentName, instanceStateMap);
- }
- LOGGER.info(
- "Rebalanced {} COMPLETED segments with instance partitions: {} and
{} CONSUMING segments with instance partitions: {} for table: {}, number of
segments to be moved to each instances: {}",
- completedSegmentAssignment.size(),
instancePartitionsForCompletedSegments.getPartitionToInstancesMap(),
- consumingSegmentAssignment.size(),
instancePartitionsForConsumingSegments.getPartitionToInstancesMap(),
- _tableNameWithType,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
- } else {
- LOGGER.info(
- "Rebalanced {} COMPLETED segments with instance partitions: {} for
table: {}, number of segments to be moved to each instance: {}",
- completedSegmentAssignment.size(),
instancePartitionsForCompletedSegments.getPartitionToInstancesMap(),
- _tableNameWithType,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment,
newAssignment));
- newAssignment.putAll(consumingSegmentAssignment);
- }
-
- // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the OFFLINE
- // segments and re-assign them
-
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
-
- return newAssignment;
- }
-
- /**
- * Returns the instances for the given partition id for CONSUMING segments.
- * <p>Within a replica, uniformly spray the partitions across the instances.
- * <p>E.g. (within a replica, 3 servers, 6 partitions)
- * <pre>
- * "0_0": [i0, i1, i2]
- * p0 p1 p2
- * p3 p4 p5
- * </pre>
- */
- private List<String> getInstances(InstancePartitions instancePartitions, int
partitionId) {
- int numReplicas = instancePartitions.getNumReplicas();
- List<String> instancesAssigned = new ArrayList<>(numReplicas);
- for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
- List<String> instances = instancePartitions.getInstances(0, replicaId);
- instancesAssigned.add(instances.get(partitionId % instances.size()));
- }
- return instancesAssigned;
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
new file mode 100644
index 0000000..ab279fb
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TableConfig;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Segment assignment for LLC real-time table.
+ * <ul>
+ * <li>
+ * For the CONSUMING segments, it is very similar to replica-group based
segment assignment with the following
+ * differences:
+ * <ul>
+ * <li>
+ * 1. Within a replica, all segments of the same partition (steam
partition) are always assigned to exactly one
+ * server, and because of that we can directly assign or rebalance the
CONSUMING segments to the servers based
+ * on the partition id
+ * </li>
+ * <li>
+ * 2. Partition id for an instance is derived from the index of the
instance (within the replica-group for
+ * replica-group based assignment), instead of explicitly stored in
the instance partitions
+ * </li>
+ * </ul>
+ * </li>
+ * <li>
+ * For the COMPLETED segments, rebalance segments the same way as
OfflineSegmentAssignment.
+ * </li>
+ * </ul>
+ */
+public class RealtimeSegmentAssignment implements SegmentAssignment {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentAssignment.class);
+
+ private String _realtimeTableName;
+ private int _replication;
+
+ @Override
+ public void init(HelixManager helixManager, TableConfig tableConfig) {
+ _realtimeTableName = tableConfig.getTableName();
+ _replication =
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+
+ LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {}
for table: {}", _replication,
+ _realtimeTableName);
+ }
+
+ @Override
+ public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _realtimeTableName);
+ Preconditions
+ .checkState(instancePartitions.getNumPartitions() == 1, "Instance
partitions: %s should contain 1 partition",
+ instancePartitions.getName());
+ LOGGER.info("Assigning segment: {} with instance partitions: {} for table:
{}", segmentName, instancePartitions,
+ _realtimeTableName);
+
+ List<String> instancesAssigned = assignSegment(segmentName,
instancePartitions);
+ LOGGER.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
+ _realtimeTableName);
+ return instancesAssigned;
+ }
+
+ /**
+ * Helper method to assign instances based on the segment partition id and
instance partitions.
+ */
+ private List<String> assignSegment(String segmentName, InstancePartitions
instancePartitions) {
+ int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+
+ if (instancePartitions.getNumReplicas() == 1) {
+ // Non-replica-group based assignment:
+ // Uniformly spray the partitions and replicas across the instances.
+ // E.g. (6 servers, 3 partitions, 4 replicas)
+ // "0_0": [i0, i1, i2, i3, i4, i5 ]
+ // p0r0 p0r1 p0r2 p1r3 p1r0 p1r1
+ // p1r2 p1r3 p2r0 p2r1 p2r2 p2r3
+
+ List<String> instances =
+
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
_replication);
+ int numInstances = instances.size();
+ List<String> instancesAssigned = new ArrayList<>(_replication);
+ for (int replicaId = 0; replicaId < _replication; replicaId++) {
+ instancesAssigned.add(instances.get((partitionId * _replication +
replicaId) % numInstances));
+ }
+ return instancesAssigned;
+ } else {
+ // Replica-group based assignment:
+ // Within a replica, uniformly spray the partitions across the instances.
+ // E.g. (within a replica, 3 servers, 6 partitions)
+ // "0_0": [i0, i1, i2]
+ // p0 p1 p2
+ // p3 p4 p5
+
+ int numReplicas = instancePartitions.getNumReplicas();
+ if (numReplicas != _replication) {
+ LOGGER.warn(
+ "Number of replicas in instance partitions {}: {} does not match
replication in table config: {} for table: {}, use: {}",
+ instancePartitions.getName(), numReplicas, _replication,
_realtimeTableName, numReplicas);
+ }
+
+ List<String> instancesAssigned = new ArrayList<>(numReplicas);
+ for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
+ List<String> instances = instancePartitions.getInstances(0, replicaId);
+ instancesAssigned.add(instances.get(partitionId % instances.size()));
+ }
+ return instancesAssigned;
+ }
+ }
+
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
Configuration config) {
+ SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment
completedConsumingOfflineSegmentAssignment =
+ new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
+
+ // Rebalance COMPLETED segments first
+ Map<String, Map<String, String>> completedSegmentAssignment =
+
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
+ InstancePartitions completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+ Preconditions
+ .checkState(completedInstancePartitions != null, "Failed to find
COMPLETED instance partitions for table: %s",
+ _realtimeTableName);
+ LOGGER.info("Rebalancing COMPLETED segments for table: {} with instance
partitions: {}", _realtimeTableName,
+ completedInstancePartitions);
+
+ Map<String, Map<String, String>> newAssignment;
+ if (completedInstancePartitions.getNumReplicas() == 1) {
+ // Non-replica-group based assignment
+
+ List<String> instances = SegmentAssignmentUtils
+
.getInstancesForNonReplicaGroupBasedAssignment(completedInstancePartitions,
_replication);
+ newAssignment = SegmentAssignmentUtils
+
.rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment,
instances, _replication);
+ } else {
+ // Replica-group based assignment
+
+ int numReplicas = completedInstancePartitions.getNumReplicas();
+ if (numReplicas != _replication) {
+ LOGGER.warn(
+ "Number of replicas in instance partitions {}: {} does not match
replication in table config: {} for table: {}, use: {}",
+ completedInstancePartitions.getName(), numReplicas, _replication,
_realtimeTableName, numReplicas);
+ }
+
+ Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
+ for (String segmentName : completedSegmentAssignment.keySet()) {
+ int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+ partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new
HashSet<>()).add(segmentName);
+ }
+ newAssignment = SegmentAssignmentUtils
+ .rebalanceReplicaGroupBasedTable(completedSegmentAssignment,
completedInstancePartitions,
+ partitionIdToSegmentsMap);
+ }
+
+ // Rebalance CONSUMING segments if configured
+ Map<String, Map<String, String>> consumingSegmentAssignment =
+
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
+ if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
+ RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
+ InstancePartitions consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions
+ .checkState(consumingInstancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _realtimeTableName);
+ Preconditions.checkState(consumingInstancePartitions.getNumPartitions()
== 1,
+ "Instance partitions: %s should contain 1 partition",
consumingInstancePartitions.getName());
+ LOGGER.info("Rebalancing CONSUMING segments for table: {} with instance
partitions: {}", _realtimeTableName,
+ consumingInstancePartitions);
+
+ for (String segmentName : consumingSegmentAssignment.keySet()) {
+ List<String> instancesAssigned = assignSegment(segmentName,
consumingInstancePartitions);
+ Map<String, String> instanceStateMap = SegmentAssignmentUtils
+ .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ newAssignment.put(segmentName, instanceStateMap);
+ }
+ } else {
+ newAssignment.putAll(consumingSegmentAssignment);
+ }
+
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the OFFLINE
+ // segments and re-assign them
+
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
+ LOGGER.info("Rebalanced table: {}, number of segments to be moved to each
instance: {}", _realtimeTableName,
+
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
+ return newAssignment;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
similarity index 59%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
index de2ea0d..fd613ff 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
@@ -23,15 +23,21 @@ import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
/**
- * Strategy to assign segment to instances or rebalance all segments in a
table.
+ * Interface for segment assignment and table rebalance.
+ * <p>
+ * TODO: Add SegmentAssignmentStrategy interface and support custom segment
assignment strategy (e.g. cost based segment
+ * assignment). SegmentAssignmentStrategy should not be coupled with
SegmentAssignment, and SegmentAssignment
+ * should be able to choose the segment assignment strategy based on the
configuration.
*/
-public interface SegmentAssignmentStrategy {
+public interface SegmentAssignment {
/**
- * Initializes the segment assignment strategy.
+ * Initializes the segment assignment.
*
* @param helixManager Helix manager
* @param tableConfig Table config
@@ -39,21 +45,24 @@ public interface SegmentAssignmentStrategy {
void init(HelixManager helixManager, TableConfig tableConfig);
/**
- * Assigns a new segment.
+ * Assigns segment to instances.
*
* @param segmentName Name of the segment to be assigned
* @param currentAssignment Current segment assignment of the table (map
from segment name to instance state map)
- * @return List of servers to assign the segment to
+ * @param instancePartitionsMap Map from type (OFFLINE|CONSUMING|COMPLETED)
to instance partitions
+ * @return List of instances to assign the segment to
*/
- List<String> assignSegment(String segmentName, Map<String, Map<String,
String>> currentAssignment);
+ List<String> assignSegment(String segmentName, Map<String, Map<String,
String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap);
/**
- * Rebalances the segments for a table.
+ * Rebalances the segment assignment for a table.
*
* @param currentAssignment Current segment assignment of the table (map
from segment name to instance state map)
+ * @param instancePartitionsMap Map from type (OFFLINE|CONSUMING|COMPLETED)
to instance partitions
* @param config Configuration for the rebalance
- * @return the rebalanced assignment for the segments
+ * @return Rebalanced assignment for the segments
*/
Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String,
String>> currentAssignment,
- Configuration config);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
Configuration config);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
new file mode 100644
index 0000000..db531bb
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+
+
+/**
+ * Factory for the {@link SegmentAssignment}.
+ */
+public class SegmentAssignmentFactory {
+ private SegmentAssignmentFactory() {
+ }
+
+ public static SegmentAssignment getSegmentAssignment(HelixManager
helixManager, TableConfig tableConfig) {
+ SegmentAssignment segmentAssignment;
+ if (tableConfig.getTableType() == TableType.OFFLINE) {
+ segmentAssignment = new OfflineSegmentAssignment();
+ } else {
+ segmentAssignment = new RealtimeSegmentAssignment();
+ }
+ segmentAssignment.init(helixManager, tableConfig);
+ return segmentAssignment;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
deleted file mode 100644
index 875b856..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-import
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
-
-
-/**
- * Factory for the {@link SegmentAssignmentStrategy}.
- */
-public class SegmentAssignmentStrategyFactory {
- private SegmentAssignmentStrategyFactory() {
- }
-
- public static SegmentAssignmentStrategy
getSegmentAssignmentStrategy(HelixManager helixManager,
- TableConfig tableConfig) {
- SegmentAssignmentStrategy segmentAssignmentStrategy;
- if (AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
-
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()))
{
- if (tableConfig.getTableType() == TableType.OFFLINE) {
- segmentAssignmentStrategy = new
OfflineReplicaGroupSegmentAssignmentStrategy();
- } else {
- segmentAssignmentStrategy = new
RealtimeReplicaGroupSegmentAssignmentStrategy();
- }
- } else {
- if (tableConfig.getTableType() == TableType.OFFLINE) {
- segmentAssignmentStrategy = new
OfflineBalanceNumSegmentAssignmentStrategy();
- } else {
- segmentAssignmentStrategy = new
RealtimeBalanceNumSegmentAssignmentStrategy();
- }
- }
- segmentAssignmentStrategy.init(helixManager, tableConfig);
- return segmentAssignmentStrategy;
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 0f3fe6f..38d9681 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -27,15 +27,11 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
-import org.apache.helix.HelixManager;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.pinot.common.config.TableConfig;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
-import org.apache.pinot.common.utils.InstancePartitionsType;
import org.apache.pinot.common.utils.Pairs;
import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
-import
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
/**
@@ -73,23 +69,23 @@ class SegmentAssignmentUtils {
}
/**
- * Returns the instances for the balance number segment assignment strategy.
+ * Returns instances for non-replica-group based assignment.
*/
- static List<String> getInstancesForBalanceNumStrategy(HelixManager
helixManager, TableConfig tableConfig,
- int replication, InstancePartitionsType instancePartitionsType) {
- InstancePartitions instancePartitions =
- InstancePartitionsUtils.fetchOrComputeInstancePartitions(helixManager,
tableConfig, instancePartitionsType);
- Preconditions.checkArgument(instancePartitions.getNumPartitions() == 1 &&
instancePartitions.getNumReplicas() == 1,
- "The instance partitions: %s should contain only 1 partition and 1
replica", instancePartitions.getName());
+ static List<String>
getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions
instancePartitions,
+ int replication) {
+ Preconditions.checkState(instancePartitions.getNumReplicas() == 1 &&
instancePartitions.getNumPartitions() == 1,
+ "Instance partitions: %s should contain 1 replica and 1 partition for
non-replica-group based assignment",
+ instancePartitions.getName());
List<String> instances = instancePartitions.getInstances(0, 0);
- Preconditions.checkState(instances.size() >= replication,
- "There are less instances: %d than the replication: %d for table: %s",
instances.size(), replication,
- tableConfig.getTableName());
+ int numInstances = instances.size();
+ Preconditions.checkState(numInstances >= replication,
+ "There are less instances: %s in instance partitions: %s than the
table replication: %s", numInstances,
+ instancePartitions.getName(), replication);
return instances;
}
/**
- * Rebalances the table with Helix AutoRebalanceStrategy for the balance
number segment assignment strategy.
+ * Rebalances the table with Helix AutoRebalanceStrategy.
*/
static Map<String, Map<String, String>>
rebalanceTableWithHelixAutoRebalanceStrategy(
Map<String, Map<String, String>> currentAssignment, List<String>
instances, int replication) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
index 9d6e7d7..004ff1f 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
@@ -283,7 +283,7 @@ public class PinotInstanceAssignmentRestletResourceTest
extends ControllerTest {
// Post the CONSUMING instance partitions
instancePartitionsMap = deserializeInstancePartitionsMap(
sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
null),
- JsonUtils.objectToString(consumingInstancePartitions)));
+ consumingInstancePartitions.toJsonString()));
assertEquals(instancePartitionsMap.size(), 1);
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
0),
Collections.singletonList(realtimeInstanceId));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
similarity index 76%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
index 762773b..c455274 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
@@ -19,14 +19,11 @@
package org.apache.pinot.controller.helix.core.assignment.segment;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.InstancePartitionsType;
@@ -34,16 +31,11 @@ import
org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class OfflineBalanceNumSegmentAssignmentStrategyTest {
+public class OfflineNonReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final String SEGMENT_NAME_PREFIX = "segment_";
private static final int NUM_SEGMENTS = 100;
@@ -57,33 +49,26 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest
{
private static final String INSTANCE_PARTITIONS_NAME =
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME);
- private SegmentAssignmentStrategy _strategy;
+ private SegmentAssignment _segmentAssignment;
+ private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMap;
@BeforeClass
public void setUp() {
+ TableConfig tableConfig =
+ new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+ _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfig);
+
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4,
instance_5, instance_6, instance_7, instance_8, instance_9]
// }
InstancePartitions instancePartitions = new
InstancePartitions(INSTANCE_PARTITIONS_NAME);
instancePartitions.setInstances(0, 0, INSTANCES);
-
- // Mock HelixManager
- @SuppressWarnings("unchecked")
- ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
- when(propertyStore
-
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME)),
any(),
- anyInt())).thenReturn(instancePartitions.toZNRecord());
- HelixManager helixManager = mock(HelixManager.class);
- when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
- TableConfig tableConfig =
- new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
- _strategy =
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager,
tableConfig);
+ _instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions);
}
@Test
public void testFactory() {
- assertTrue(_strategy instanceof
OfflineBalanceNumSegmentAssignmentStrategy);
+ assertTrue(_segmentAssignment instanceof OfflineSegmentAssignment);
}
@Test
@@ -98,7 +83,8 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest {
// ...
int expectedAssignedInstanceId = 0;
for (String segmentName : SEGMENTS) {
- List<String> instancesAssigned = _strategy.assignSegment(segmentName,
currentAssignment);
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
assertEquals(instancesAssigned.get(replicaId),
INSTANCES.get(expectedAssignedInstanceId));
@@ -113,7 +99,8 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest {
public void testTableBalanced() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
- List<String> instancesAssigned = _strategy.assignSegment(segmentName,
currentAssignment);
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
}
@@ -132,6 +119,6 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest
{
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(_strategy.rebalanceTable(currentAssignment, null),
currentAssignment);
+ assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null), currentAssignment);
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
similarity index 86%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index 28586f0..089c016 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -51,7 +51,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class OfflineReplicaGroupSegmentAssignmentStrategyTest {
+public class OfflineReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final String SEGMENT_NAME_PREFIX = "segment_";
private static final int NUM_SEGMENTS = 90;
@@ -72,11 +72,20 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
private static final String PARTITION_COLUMN = "partitionColumn";
private static final int NUM_PARTITIONS = 3;
- private SegmentAssignmentStrategy _strategyWithoutPartition;
- private SegmentAssignmentStrategy _strategyWithPartition;
+ private SegmentAssignment _segmentAssignmentWithoutPartition;
+ private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMapWithoutPartition;
+ private SegmentAssignment _segmentAssignmentWithPartition;
+ private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMapWithPartition;
@BeforeClass
public void setUp() {
+ TableConfig tableConfigWithoutPartitions =
+ new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION)
+ .setNumReplicas(NUM_REPLICAS)
+
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
+ _segmentAssignmentWithoutPartition =
+ SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfigWithoutPartitions);
+
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4,
instance_5],
// 0_1=[instance_6, instance_7, instance_8, instance_9, instance_10,
instance_11],
@@ -93,54 +102,12 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
}
instancePartitionsWithoutPartition.setInstances(0, replicaId,
instancesForReplica);
}
-
- // Mock HelixManager
- @SuppressWarnings("unchecked")
- ZkHelixPropertyStore<ZNRecord> propertyStoreWithoutPartitions =
mock(ZkHelixPropertyStore.class);
- when(propertyStoreWithoutPartitions.get(eq(ZKMetadataProvider
-
.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION)),
any(), anyInt()))
- .thenReturn(instancePartitionsWithoutPartition.toZNRecord());
- HelixManager helixManagerWithoutPartitions = mock(HelixManager.class);
-
when(helixManagerWithoutPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithoutPartitions);
-
- TableConfig tableConfigWithoutPartitions =
- new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION)
- .setNumReplicas(NUM_REPLICAS)
-
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
- _strategyWithoutPartition = SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(helixManagerWithoutPartitions,
tableConfigWithoutPartitions);
-
- // {
- // 0_0=[instance_0, instance_1],
- // 0_1=[instance_6, instance_7],
- // 0_2=[instance_12, instance_13],
- // 1_0=[instance_2, instance_3],
- // 1_1=[instance_8, instance_9],
- // 1_2=[instance_14, instance_15],
- // 2_0=[instance_4, instance_5],
- // 2_1=[instance_10, instance_11],
- // 2_2=[instance_16, instance_17]
- // }
- InstancePartitions instancePartitionsWithPartition =
- new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
- int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS;
- instanceIdToAdd = 0;
- for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
- for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
- List<String> instancesForPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int i = 0; i < numInstancesPerPartition; i++) {
- instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
- }
- instancePartitionsWithPartition.setInstances(partitionId, replicaId,
instancesForPartition);
- }
- }
+ _instancePartitionsMapWithoutPartition =
+ Collections.singletonMap(InstancePartitionsType.OFFLINE,
instancePartitionsWithoutPartition);
// Mock HelixManager
@SuppressWarnings("unchecked")
ZkHelixPropertyStore<ZNRecord> propertyStoreWithPartitions =
mock(ZkHelixPropertyStore.class);
- when(propertyStoreWithPartitions.get(
-
eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION)),
- any(),
anyInt())).thenReturn(instancePartitionsWithPartition.toZNRecord());
List<ZNRecord> segmentZKMetadataZNRecords = new ArrayList<>(NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
@@ -168,14 +135,41 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
.setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(strategyConfig);
- _strategyWithPartition = SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(helixManagerWithPartitions,
tableConfigWithPartitions);
+ _segmentAssignmentWithPartition =
+
SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions,
tableConfigWithPartitions);
+
+ // {
+ // 0_0=[instance_0, instance_1],
+ // 0_1=[instance_6, instance_7],
+ // 0_2=[instance_12, instance_13],
+ // 1_0=[instance_2, instance_3],
+ // 1_1=[instance_8, instance_9],
+ // 1_2=[instance_14, instance_15],
+ // 2_0=[instance_4, instance_5],
+ // 2_1=[instance_10, instance_11],
+ // 2_2=[instance_16, instance_17]
+ // }
+ InstancePartitions instancePartitionsWithPartition =
+ new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
+ int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS;
+ instanceIdToAdd = 0;
+ for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
+ for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
+ List<String> instancesForPartition = new
ArrayList<>(numInstancesPerPartition);
+ for (int i = 0; i < numInstancesPerPartition; i++) {
+ instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
+ }
+ instancePartitionsWithPartition.setInstances(partitionId, replicaId,
instancesForPartition);
+ }
+ }
+ _instancePartitionsMapWithPartition =
+ Collections.singletonMap(InstancePartitionsType.OFFLINE,
instancePartitionsWithPartition);
}
@Test
public void testFactory() {
- assertTrue(_strategyWithoutPartition instanceof
OfflineReplicaGroupSegmentAssignmentStrategy);
- assertTrue(_strategyWithPartition instanceof
OfflineReplicaGroupSegmentAssignmentStrategy);
+ assertTrue(_segmentAssignmentWithoutPartition instanceof
OfflineSegmentAssignment);
+ assertTrue(_segmentAssignmentWithPartition instanceof
OfflineSegmentAssignment);
}
@Test
@@ -184,7 +178,8 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
- List<String> instancesAssigned =
_strategyWithoutPartition.assignSegment(segmentName, currentAssignment);
+ List<String> instancesAssigned = _segmentAssignmentWithoutPartition
+ .assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithoutPartition);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
@@ -212,7 +207,8 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
int numInstancesPerPartition = numInstancesPerReplica / NUM_PARTITIONS;
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = SEGMENTS.get(segmentId);
- List<String> instancesAssigned =
_strategyWithPartition.assignSegment(segmentName, currentAssignment);
+ List<String> instancesAssigned = _segmentAssignmentWithPartition
+ .assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithPartition);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
int partitionId = segmentId % NUM_PARTITIONS;
for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
@@ -240,7 +236,8 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
public void testTableBalancedWithoutPartition() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
- List<String> instancesAssigned =
_strategyWithoutPartition.assignSegment(segmentName, currentAssignment);
+ List<String> instancesAssigned = _segmentAssignmentWithoutPartition
+ .assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithoutPartition);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
}
@@ -259,14 +256,16 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(_strategyWithoutPartition.rebalanceTable(currentAssignment,
null), currentAssignment);
+ assertEquals(_segmentAssignmentWithoutPartition
+ .rebalanceTable(currentAssignment,
_instancePartitionsMapWithoutPartition, null), currentAssignment);
}
@Test
public void testTableBalancedWithPartition() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (String segmentName : SEGMENTS) {
- List<String> instancesAssigned =
_strategyWithPartition.assignSegment(segmentName, currentAssignment);
+ List<String> instancesAssigned = _segmentAssignmentWithPartition
+ .assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithPartition);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
}
@@ -285,6 +284,8 @@ public class
OfflineReplicaGroupSegmentAssignmentStrategyTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(_strategyWithPartition.rebalanceTable(currentAssignment,
null), currentAssignment);
+ assertEquals(
+ _segmentAssignmentWithPartition.rebalanceTable(currentAssignment,
_instancePartitionsMapWithPartition, null),
+ currentAssignment);
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
similarity index 80%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 0aa454c..5fee01a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -23,11 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.CommonConstants;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.InstancePartitionsType;
@@ -37,16 +33,11 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConst
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
+public class RealtimeNonReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final int NUM_PARTITIONS = 4;
private static final int NUM_SEGMENTS = 100;
@@ -65,7 +56,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME);
private List<String> _segments;
- private SegmentAssignmentStrategy _strategy;
+ private SegmentAssignment _segmentAssignment;
+ private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMap;
@BeforeClass
public void setUp() {
@@ -75,7 +67,13 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest
{
System.currentTimeMillis()).getSegmentName());
}
- // Consuming instances:
+ TableConfig tableConfig =
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(NUM_REPLICAS).setLLC(true).build();
+ _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfig);
+
+ _instancePartitionsMap = new TreeMap<>();
+ // CONSUMING instances:
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4,
instance_5, instance_6, instance_7, instance_8]
// }
@@ -83,35 +81,20 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
// p3r0 p3r1 p3r2
InstancePartitions consumingInstancePartitions = new
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
consumingInstancePartitions.setInstances(0, 0, CONSUMING_INSTANCES);
+ _instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
consumingInstancePartitions);
- // Completed instances:
+ // COMPLETED instances:
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4,
instance_5, instance_6, instance_7, instance_8, instance_9]
// }
InstancePartitions completedInstancePartitions = new
InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME);
completedInstancePartitions.setInstances(0, 0, COMPLETED_INSTANCES);
-
- // Mock HelixManager
- @SuppressWarnings("unchecked")
- ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
- when(propertyStore
-
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)),
- any(),
anyInt())).thenReturn(consumingInstancePartitions.toZNRecord());
- when(propertyStore
-
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)),
- any(),
anyInt())).thenReturn(completedInstancePartitions.toZNRecord());
- HelixManager helixManager = mock(HelixManager.class);
- when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
- TableConfig tableConfig =
- new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(NUM_REPLICAS).setLLC(true).build();
- _strategy =
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager,
tableConfig);
+ _instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
completedInstancePartitions);
}
@Test
public void testFactory() {
- assertTrue(_strategy instanceof
RealtimeBalanceNumSegmentAssignmentStrategy);
+ assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
}
@Test
@@ -119,7 +102,8 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
- List<String> instancesAssigned = _strategy.assignSegment(segmentName,
currentAssignment);
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
@@ -143,7 +127,8 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
- List<String> instancesAssigned = _strategy.assignSegment(segmentName,
currentAssignment);
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
addToAssignment(currentAssignment, segmentId, instancesAssigned);
}
@@ -154,20 +139,20 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
- // Rebalance should relocate all completed (ONLINE) segments to the
completed instances
+ // Rebalance should relocate all COMPLETED (ONLINE) segments to the
COMPLETED instances
Map<String, Map<String, String>> newAssignment =
- _strategy.rebalanceTable(currentAssignment, new BaseConfiguration());
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, new BaseConfiguration());
assertEquals(newAssignment.size(), NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
- // Completed (ONLINE) segments
+ // COMPLETED (ONLINE) segments
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
}
} else {
- // Consuming (CONSUMING) segments
+ // CONSUMING segments
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
@@ -184,10 +169,10 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
assertTrue(numSegmentsAssignedPerInstance[i] >=
expectedMinNumSegmentsPerInstance);
}
- // Rebalance all segments (both completed and consuming) should give the
same assignment
+ // Rebalance all segments (including CONSUMING) should give the same
assignment
BaseConfiguration config = new BaseConfiguration();
config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
- assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
+ assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, config), newAssignment);
// Rebalance should not change the assignment for the OFFLINE segments
String offlineSegmentName = "offlineSegment";
@@ -196,7 +181,7 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
- assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
+ assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, config), newAssignment);
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
similarity index 82%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index f74a7b2..04fc344 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -24,11 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
@@ -39,16 +35,11 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConst
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class RealtimeReplicaGroupSegmentAssignmentStrategyTest {
+public class RealtimeReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final int NUM_PARTITIONS = 4;
private static final int NUM_SEGMENTS = 100;
@@ -67,7 +58,8 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME);
private List<String> _segments;
- private SegmentAssignmentStrategy _strategy;
+ private SegmentAssignment _segmentAssignment;
+ private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMap;
@BeforeClass
public void setUp() {
@@ -77,7 +69,14 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
System.currentTimeMillis()).getSegmentName());
}
- // Consuming instances:
+ TableConfig tableConfig =
+ new
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+ .build();
+ _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfig);
+
+ _instancePartitionsMap = new TreeMap<>();
+ // CONSUMING instances:
// {
// 0_0=[instance_0, instance_1, instance_2],
// 0_1=[instance_3, instance_4, instance_5],
@@ -95,8 +94,9 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
}
consumingInstancePartitions.setInstances(0, replicaId,
consumingInstancesForReplica);
}
+ _instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
consumingInstancePartitions);
- // Completed instances:
+ // COMPLETED instances:
// {
// 0_0=[instance_0, instance_1, instance_2, instance_3],
// 0_1=[instance_4, instance_5, instance_6, instance_7],
@@ -112,29 +112,12 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
}
completedInstancePartitions.setInstances(0, replicaId,
completedInstancesForReplica);
}
-
- // Mock HelixManager
- @SuppressWarnings("unchecked")
- ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
- when(propertyStore
-
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)),
- any(),
anyInt())).thenReturn(consumingInstancePartitions.toZNRecord());
- when(propertyStore
-
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)),
- any(),
anyInt())).thenReturn(completedInstancePartitions.toZNRecord());
- HelixManager helixManager = mock(HelixManager.class);
- when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
- TableConfig tableConfig =
- new
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
- .build();
- _strategy =
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager,
tableConfig);
+ _instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
completedInstancePartitions);
}
@Test
public void testFactory() {
- assertTrue(_strategy instanceof
RealtimeReplicaGroupSegmentAssignmentStrategy);
+ assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
}
@Test
@@ -143,7 +126,8 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
- List<String> instancesAssigned = _strategy.assignSegment(segmentName,
currentAssignment);
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
@@ -167,7 +151,8 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
- List<String> instancesAssigned = _strategy.assignSegment(segmentName,
currentAssignment);
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
addToAssignment(currentAssignment, segmentId, instancesAssigned);
}
@@ -178,20 +163,20 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
- // Rebalance should relocate all completed (ONLINE) segments to the
completed instances
+ // Rebalance should relocate all COMPLETED (ONLINE) segments to the
COMPLETED instances
Map<String, Map<String, String>> newAssignment =
- _strategy.rebalanceTable(currentAssignment, new BaseConfiguration());
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, new BaseConfiguration());
assertEquals(newAssignment.size(), NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
- // Completed (ONLINE) segments
+ // COMPLETED (ONLINE) segments
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
}
} else {
- // Consuming (CONSUMING) segments
+ // CONSUMING segments
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
@@ -207,10 +192,10 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
- // Rebalance all segments (both completed and consuming) should give the
same assignment
+ // Rebalance all segments (including CONSUMING) should give the same
assignment
BaseConfiguration config = new BaseConfiguration();
config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
- assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
+ assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, config), newAssignment);
// Rebalance should not change the assignment for the OFFLINE segments
String offlineSegmentName = "offlineSegment";
@@ -219,7 +204,7 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
- assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
+ assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, config), newAssignment);
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]