This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d69302  [Instance Assignment] De-couple assignment strategy from 
SegmentAssignment (#4533)
0d69302 is described below

commit 0d69302007bc5c452c0c9dd9b2389a4582cb84bd
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Aug 19 14:15:21 2019 -0700

    [Instance Assignment] De-couple assignment strategy from SegmentAssignment 
(#4533)
    
    For REALTIME segments, we should be able to assign CONSUMING and
    COMPLETED segments with different strategies, i.e. one could
    follows replica-group while the other follows balance-number. In
    order to do so, we need to de-couple the assignment strategy from
    the segment assignment, so each assignment can use different
    strategies if necessary.
    
    - Change SegmentAssignmentStrategy interface to SegmentAssignment
    - Add OfflineSegmentAssignment and RealtimeSegmentAssignment
    - Use InstancePartitions to determine the strategy to use
    - For REALTIME table, support different strategies for CONSUMING
      and COMPLETED segments
---
 .../helix/core/assignment/InstancePartitions.java  |  17 ++
 ...OfflineBalanceNumSegmentAssignmentStrategy.java |  96 --------
 ...flineReplicaGroupSegmentAssignmentStrategy.java | 200 ----------------
 .../segment/OfflineSegmentAssignment.java          | 260 +++++++++++++++++++++
 ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 161 -------------
 ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 165 -------------
 .../segment/RealtimeSegmentAssignment.java         | 218 +++++++++++++++++
 ...ignmentStrategy.java => SegmentAssignment.java} |  27 ++-
 .../segment/SegmentAssignmentFactory.java          |  43 ++++
 .../segment/SegmentAssignmentStrategyFactory.java  |  54 -----
 .../assignment/segment/SegmentAssignmentUtils.java |  26 +--
 ...PinotInstanceAssignmentRestletResourceTest.java |   2 +-
 ...flineNonReplicaGroupSegmentAssignmentTest.java} |  43 ++--
 ... OfflineReplicaGroupSegmentAssignmentTest.java} | 115 ++++-----
 ...ltimeNonReplicaGroupSegmentAssignmentTest.java} |  65 ++----
 ...RealtimeReplicaGroupSegmentAssignmentTest.java} |  67 +++---
 16 files changed, 692 insertions(+), 867 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
index 4ed659e..4060f4b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
@@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.utils.JsonUtils;
 
 
 /**
@@ -64,10 +66,12 @@ public class InstancePartitions {
     }
   }
 
+  @JsonProperty
   public String getName() {
     return _name;
   }
 
+  @JsonProperty
   public Map<String, List<String>> getPartitionToInstancesMap() {
     return _partitionToInstancesMap;
   }
@@ -102,4 +106,17 @@ public class InstancePartitions {
     znRecord.setListFields(_partitionToInstancesMap);
     return znRecord;
   }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.objectToString(this);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toJsonString();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
deleted file mode 100644
index 4bd34b5..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.common.utils.Pairs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for offline segments that assigns segment to 
the instance with the least number of
- * segments. In case of a tie, assigns to the instance with the smallest index 
in the list. The strategy ensures that
- * replicas of the same segment are not assigned to the same server.
- * <p>To rebalance a table, use Helix AutoRebalanceStrategy.
- */
-public class OfflineBalanceNumSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(OfflineBalanceNumSegmentAssignmentStrategy.class);
-
-  private HelixManager _helixManager;
-  private TableConfig _tableConfig;
-  private String _tableNameWithType;
-  private int _replication;
-
-  @Override
-  public void init(HelixManager helixManager, TableConfig tableConfig) {
-    _helixManager = helixManager;
-    _tableConfig = tableConfig;
-    _tableNameWithType = tableConfig.getTableName();
-    _replication = tableConfig.getValidationConfig().getReplicationNumber();
-
-    LOGGER.info("Initialized OfflineBalanceNumSegmentAssignmentStrategy for 
table: {} with replication: {}",
-        _tableNameWithType, _replication);
-  }
-
-  @Override
-  public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment) {
-    List<String> instances = SegmentAssignmentUtils
-        .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, 
_replication, InstancePartitionsType.OFFLINE);
-    int[] numSegmentsAssignedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, 
instances);
-
-    // Assign the segment to the instance with the least segments, or the 
smallest id if there is a tie
-    int numInstances = numSegmentsAssignedPerInstance.length;
-    PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, 
Pairs.intPairComparator());
-    for (int instanceId = 0; instanceId < numInstances; instanceId++) {
-      heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], 
instanceId));
-    }
-    List<String> instancesAssigned = new ArrayList<>(_replication);
-    for (int i = 0; i < _replication; i++) {
-      instancesAssigned.add(instances.get(heap.remove().getRight()));
-    }
-
-    LOGGER.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
-        _tableNameWithType);
-    return instancesAssigned;
-  }
-
-  @Override
-  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Configuration config) {
-    List<String> instances = SegmentAssignmentUtils
-        .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, 
_replication, InstancePartitionsType.OFFLINE);
-    Map<String, Map<String, String>> newAssignment =
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 instances, _replication);
-
-    LOGGER.info(
-        "Rebalanced {} segments to instances: {} for table: {} with 
replication: {}, number of segments to be moved to each instance: {}",
-        currentAssignment.size(), instances, _tableNameWithType, _replication,
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
-    return newAssignment;
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
deleted file mode 100644
index 8124d30..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
-import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for offline segments that assigns segment to 
the instance in the replica with the least
- * number of segments.
- * <p>Among multiple replicas, always mirror the assignment (pick the same 
index of the instance).
- * <p>Inside each partition, assign the segment to the servers with the least 
segments already assigned. In case of a
- * tie, assign to the server with the smallest index in the list. Do this for 
one replica and mirror the assignment to
- * other replicas.
- * <p>To rebalance a table, inside each partition, first calculate the number 
of segments on each server, loop over all
- * the segments and keep the assignment if number of segments for the server 
has not been reached and track the not
- * assigned segments, then assign the left-over segments to the servers with 
the least segments, or the smallest index
- * if there is a tie. Repeat the process for all the partitions in one 
replica, and mirror the assignment to other
- * replicas. With this greedy algorithm, the result is deterministic and with 
minimum segment moves.
- */
-public class OfflineReplicaGroupSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(OfflineReplicaGroupSegmentAssignmentStrategy.class);
-
-  private HelixManager _helixManager;
-  private TableConfig _tableConfig;
-  private String _tableNameWithType;
-  private String _partitionColumn;
-
-  @Override
-  public void init(HelixManager helixManager, TableConfig tableConfig) {
-    _helixManager = helixManager;
-    _tableConfig = tableConfig;
-    _tableNameWithType = tableConfig.getTableName();
-    ReplicaGroupStrategyConfig strategyConfig = 
tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
-    _partitionColumn = strategyConfig != null ? 
strategyConfig.getPartitionColumn() : null;
-
-    if (_partitionColumn == null) {
-      LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy 
for table: {} without partition column",
-          _tableNameWithType);
-    } else {
-      LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy 
for table: {} with partition column: {}",
-          _tableNameWithType, _partitionColumn);
-    }
-  }
-
-  @Override
-  public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment) {
-    InstancePartitions instancePartitions = InstancePartitionsUtils
-        .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, 
InstancePartitionsType.OFFLINE);
-
-    // Fetch partition id from segment ZK metadata if partition column is 
configured
-    int partitionId = 0;
-    if (_partitionColumn == null) {
-      Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
-          "The instance partitions: %s should contain only 1 partition", 
instancePartitions.getName());
-    } else {
-      OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
-          .getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType, segmentName);
-      Preconditions
-          .checkState(segmentZKMetadata != null, "Failed to fetch segment ZK 
metadata for table: %s, segment: %s",
-              _tableNameWithType, segmentName);
-      // Uniformly spray the segment partitions over the instance partitions
-      partitionId = getPartitionId(segmentZKMetadata) % 
instancePartitions.getNumPartitions();
-    }
-
-    // First assign the segment to replica 0
-    List<String> instances = instancePartitions.getInstances(partitionId, 0);
-    int[] numSegmentsAssignedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, 
instances);
-    int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0];
-    int instanceIdWithLeastSegmentsAssigned = 0;
-    int numInstances = numSegmentsAssignedPerInstance.length;
-    for (int instanceId = 1; instanceId < numInstances; instanceId++) {
-      if (numSegmentsAssignedPerInstance[instanceId] < minNumSegmentsAssigned) 
{
-        minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId];
-        instanceIdWithLeastSegmentsAssigned = instanceId;
-      }
-    }
-
-    // Mirror the assignment to all replicas
-    int numReplicas = instancePartitions.getNumReplicas();
-    List<String> instancesAssigned = new ArrayList<>(numReplicas);
-    for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
-      instancesAssigned
-          .add(instancePartitions.getInstances(partitionId, 
replicaId).get(instanceIdWithLeastSegmentsAssigned));
-    }
-
-    if (_partitionColumn == null) {
-      LOGGER.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
-          _tableNameWithType);
-    } else {
-      LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} 
for table: {}", segmentName, partitionId,
-          instancesAssigned, _tableNameWithType);
-    }
-    return instancesAssigned;
-  }
-
-  @Override
-  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Configuration config) {
-    InstancePartitions instancePartitions = InstancePartitionsUtils
-        .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, 
InstancePartitionsType.OFFLINE);
-    if (_partitionColumn == null) {
-      return rebalanceTableWithoutPartition(currentAssignment, 
instancePartitions);
-    } else {
-      return rebalanceTableWithPartition(currentAssignment, 
instancePartitions);
-    }
-  }
-
-  private Map<String, Map<String, String>> rebalanceTableWithoutPartition(
-      Map<String, Map<String, String>> currentAssignment, InstancePartitions 
instancePartitions) {
-    Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
-        "The instance partitions: %s should contain only 1 partition", 
instancePartitions.getName());
-
-    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
-    SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedPartition(currentAssignment, 
instancePartitions, 0, currentAssignment.keySet(),
-            newAssignment);
-
-    LOGGER.info(
-        "Rebalanced {} segments with instance partitions: {} for table: {} 
without partition column, number of segments to be moved to each instance: {}",
-        currentAssignment.size(), 
instancePartitions.getPartitionToInstancesMap(), _tableNameWithType,
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
-    return newAssignment;
-  }
-
-  private Map<String, Map<String, String>> rebalanceTableWithPartition(
-      Map<String, Map<String, String>> currentAssignment, InstancePartitions 
instancePartitions) {
-    // Fetch partition id from segment ZK metadata
-    List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider
-        
.getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(), 
_tableNameWithType);
-    Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new 
HashMap<>();
-    for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-      segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), 
segmentZKMetadata);
-    }
-    Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
-    for (String segmentName : currentAssignment.keySet()) {
-      int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
-      partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new 
HashSet<>()).add(segmentName);
-    }
-
-    Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(currentAssignment, 
instancePartitions, partitionIdToSegmentsMap);
-
-    LOGGER.info(
-        "Rebalanced {} segments with instance partitions: {} for table: {} 
with partition column: {}, number of segments to be moved to each instance: {}",
-        currentAssignment.size(), 
instancePartitions.getPartitionToInstancesMap(), _tableNameWithType, 
_partitionColumn,
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
-    return newAssignment;
-  }
-
-  private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) {
-    String segmentName = segmentZKMetadata.getSegmentName();
-    ColumnPartitionMetadata partitionMetadata =
-        
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn);
-    Preconditions.checkState(partitionMetadata != null,
-        "Segment ZK metadata for table: %s, segment: %s does not contain 
partition metadata for column: %s",
-        _tableNameWithType, segmentName, _partitionColumn);
-    Set<Integer> partitions = partitionMetadata.getPartitions();
-    Preconditions.checkState(partitions.size() == 1,
-        "Segment ZK metadata for table: %s, segment: %s contains multiple 
partitions for column: %s",
-        _tableNameWithType, segmentName, _partitionColumn);
-    return partitions.iterator().next();
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
new file mode 100644
index 0000000..5dfb034
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Segment assignment for offline table.
+ * <ul>
+ *   <li>
+ *     Non-replica-group based assignment (only 1 replica in instance 
partitions):
+ *     <p>Assign the segment to the instance with the least number of 
segments. In case of a tie, assign the segment to
+ *     the instance with the smallest index in the list. Use Helix 
AutoRebalanceStrategy to rebalance the table.
+ *   </li>
+ *   <li>
+ *     Replica-group based assignment (more than 1 replicas in instance 
partitions):
+ *     <p>Among replicas, always mirror the assignment (pick the same index of 
the instance).
+ *     <p>Within each partition, assign the segment to the servers with the 
least segments already assigned. In case of
+ *     a tie, assign to the server with the smallest index in the list. Do 
this for one replica and mirror the
+ *     assignment to other replicas.
+ *     <p>To rebalance a table, within each partition, first calculate the 
number of segments on each server, loop over
+ *     all the segments and keep the assignment if number of segments for the 
server has not been reached and track the
+ *     not assigned segments, then assign the left-over segments to the 
servers with the least segments, or the smallest
+ *     index if there is a tie. Repeat the process for all the partitions in 
one replica, and mirror the assignment to
+ *     other replicas. With this greedy algorithm, the result is deterministic 
and with minimum segment moves.
+ *   </li>
+ * </ul>
+ */
+public class OfflineSegmentAssignment implements SegmentAssignment {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OfflineSegmentAssignment.class);
+
+  private HelixManager _helixManager;
+  private String _offlineTableName;
+  private int _replication;
+  private String _partitionColumn;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    _helixManager = helixManager;
+    _offlineTableName = tableConfig.getTableName();
+    _replication = tableConfig.getValidationConfig().getReplicationNumber();
+    ReplicaGroupStrategyConfig strategyConfig = 
tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
+    _partitionColumn = strategyConfig != null ? 
strategyConfig.getPartitionColumn() : null;
+
+    if (_partitionColumn == null) {
+      LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} 
without partition column for table: {} ",
+          _replication, _offlineTableName);
+    } else {
+      LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} 
and partition column: {} for table: {}",
+          _replication, _partitionColumn, _offlineTableName);
+    }
+  }
+
+  @Override
+  public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
+        _offlineTableName);
+    LOGGER.info("Assigning segment: {} with instance partitions: {} for table: 
{}", segmentName, instancePartitions,
+        _offlineTableName);
+
+    List<String> instancesAssigned;
+    if (instancePartitions.getNumReplicas() == 1) {
+      // Non-replica-group based assignment
+
+      // Assign the segment to the instance with the least segments, or the 
smallest id if there is a tie
+      List<String> instances =
+          
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
 _replication);
+      int[] numSegmentsAssignedPerInstance =
+          
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, 
instances);
+      int numInstances = numSegmentsAssignedPerInstance.length;
+      PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, 
Pairs.intPairComparator());
+      for (int instanceId = 0; instanceId < numInstances; instanceId++) {
+        heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], 
instanceId));
+      }
+      instancesAssigned = new ArrayList<>(_replication);
+      for (int i = 0; i < _replication; i++) {
+        instancesAssigned.add(instances.get(heap.remove().getRight()));
+      }
+    } else {
+      // Replica-group based assignment
+
+      int numReplicas = instancePartitions.getNumReplicas();
+      if (numReplicas != _replication) {
+        LOGGER.warn(
+            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
+            instancePartitions.getName(), numReplicas, _replication, 
_offlineTableName, numReplicas);
+      }
+
+      // Fetch partition id from segment ZK metadata if partition column is 
configured
+      int partitionId;
+      if (_partitionColumn == null) {
+        LOGGER.info("Assigning segment: {} without partition column for table: 
{}", segmentName, _offlineTableName);
+
+        Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
+            "Instance partitions: %s should contain 1 partition without 
partition column",
+            instancePartitions.getName());
+        partitionId = 0;
+      } else {
+        LOGGER.info("Assigning segment: {} with partition column: {} for 
table: {}", segmentName, _partitionColumn,
+            _offlineTableName);
+
+        OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+            
.getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_offlineTableName, segmentName);
+        Preconditions
+            .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
+                segmentName, _offlineTableName);
+        int segmentPartitionId = getPartitionId(segmentZKMetadata);
+
+        // Uniformly spray the segment partitions over the instance partitions
+        int numPartitions = instancePartitions.getNumPartitions();
+        partitionId = segmentPartitionId % numPartitions;
+        LOGGER.info("Assigning segment: {} with partition id: {} to partition: 
{}/{} for table: {}", segmentName,
+            segmentPartitionId, partitionId, numPartitions, _offlineTableName);
+      }
+
+      // First assign the segment to replica 0
+      List<String> instances = instancePartitions.getInstances(partitionId, 0);
+      int[] numSegmentsAssignedPerInstance =
+          
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, 
instances);
+      int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0];
+      int instanceIdWithLeastSegmentsAssigned = 0;
+      int numInstances = numSegmentsAssignedPerInstance.length;
+      for (int instanceId = 1; instanceId < numInstances; instanceId++) {
+        if (numSegmentsAssignedPerInstance[instanceId] < 
minNumSegmentsAssigned) {
+          minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId];
+          instanceIdWithLeastSegmentsAssigned = instanceId;
+        }
+      }
+
+      // Mirror the assignment to all replicas
+      instancesAssigned = new ArrayList<>(numReplicas);
+      for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
+        instancesAssigned
+            .add(instancePartitions.getInstances(partitionId, 
replicaId).get(instanceIdWithLeastSegmentsAssigned));
+      }
+    }
+
+    LOGGER
+        .info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned, _offlineTableName);
+    return instancesAssigned;
+  }
+
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
+        _offlineTableName);
+    LOGGER.info("Rebalancing table: {} with instance partitions: {}", 
_offlineTableName, instancePartitions);
+
+    Map<String, Map<String, String>> newAssignment;
+    if (instancePartitions.getNumReplicas() == 1) {
+      // Non-replica-group based assignment
+
+      List<String> instances =
+          
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
 _replication);
+      newAssignment = SegmentAssignmentUtils
+          .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, 
instances, _replication);
+    } else {
+      // Replica-group based assignment
+
+      int numReplicas = instancePartitions.getNumReplicas();
+      if (numReplicas != _replication) {
+        LOGGER.warn(
+            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
+            instancePartitions.getName(), numReplicas, _replication, 
_offlineTableName, numReplicas);
+      }
+
+      if (_partitionColumn == null) {
+        LOGGER.info("Rebalancing table: {} without partition column", 
_offlineTableName);
+        Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
+            "Instance partitions: %s should contain 1 partition without 
partition column",
+            instancePartitions.getName());
+        newAssignment = new TreeMap<>();
+        SegmentAssignmentUtils
+            .rebalanceReplicaGroupBasedPartition(currentAssignment, 
instancePartitions, 0, currentAssignment.keySet(),
+                newAssignment);
+      } else {
+        LOGGER.info("Rebalancing table: {} with partition column: {}", 
_offlineTableName, _partitionColumn);
+        newAssignment = rebalanceTableWithPartition(currentAssignment, 
instancePartitions);
+      }
+    }
+
+    LOGGER.info("Rebalanced table: {}, number of segments to be moved to each 
instance: {}", _offlineTableName,
+        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
+    return newAssignment;
+  }
+
+  private Map<String, Map<String, String>> rebalanceTableWithPartition(
+      Map<String, Map<String, String>> currentAssignment, InstancePartitions 
instancePartitions) {
+    // Fetch partition id from segment ZK metadata
+    List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider
+        
.getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(), 
_offlineTableName);
+    Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new 
HashMap<>();
+    for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), 
segmentZKMetadata);
+    }
+    Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
+    for (String segmentName : currentAssignment.keySet()) {
+      int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
+      partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new 
HashSet<>()).add(segmentName);
+    }
+
+    return SegmentAssignmentUtils
+        .rebalanceReplicaGroupBasedTable(currentAssignment, 
instancePartitions, partitionIdToSegmentsMap);
+  }
+
+  private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) {
+    String segmentName = segmentZKMetadata.getSegmentName();
+    ColumnPartitionMetadata partitionMetadata =
+        
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn);
+    Preconditions.checkState(partitionMetadata != null,
+        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata for column: %s",
+        segmentName, _offlineTableName, _partitionColumn);
+    Set<Integer> partitions = partitionMetadata.getPartitions();
+    Preconditions.checkState(partitions.size() == 1,
+        "Segment ZK metadata for segment: %s of table: %s contains multiple 
partitions for column: %s", segmentName,
+        _offlineTableName, _partitionColumn);
+    return partitions.iterator().next();
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
deleted file mode 100644
index 3319ad7..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for LLC real-time segments without 
replica-group.
- * <ul>
- *   <li>
- *     For the CONSUMING segments, it is very similar to replica-group based 
segment assignment with the following
- *     differences:
- *     <ul>
- *       <li>
- *         1. Within a replica, all segments of a partition (steam partition) 
always exist in exactly one one server
- *       </li>
- *       <li>
- *         2. Partition id for an instance is derived from the index of the 
instance, instead of explicitly stored in
- *         the instance partitions
- *       </li>
- *       <li>
- *         3. In addition to the ONLINE segments, there are also CONSUMING 
segments to be assigned
- *       </li>
- *     </ul>
- *     Since within a replica, each partition contains only one server, we can 
directly assign or rebalance the
- *     CONSUMING segments to the servers based on the partition id.
- *     <p>The strategy does not minimize segment movements for CONSUMING 
segments because within a replica, the server
- *     is fixed for each partition. The instance assignment is responsible for 
keeping minimum changes to the instance
- *     partitions to reduce the number of segments need to be moved.
- *   </li>
- *   <li>
- *     For the COMPLETED segments, rebalance segments the same way as 
OfflineBalanceNumSegmentAssignmentStrategy.
- *   </li>
- * </ul>
- */
-public class RealtimeBalanceNumSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeBalanceNumSegmentAssignmentStrategy.class);
-
-  private HelixManager _helixManager;
-  private TableConfig _tableConfig;
-  private String _tableNameWithType;
-  private int _replication;
-
-  @Override
-  public void init(HelixManager helixManager, TableConfig tableConfig) {
-    _helixManager = helixManager;
-    _tableConfig = tableConfig;
-    _tableNameWithType = tableConfig.getTableName();
-    _replication = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-
-    LOGGER.info("Initialized RealtimeBalanceNumSegmentAssignmentStrategy for 
table: {} with replication: {}",
-        _tableNameWithType, _replication);
-  }
-
-  @Override
-  public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment) {
-    List<String> instances = SegmentAssignmentUtils
-        .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, 
_replication, InstancePartitionsType.CONSUMING);
-    int partitionId = new LLCSegmentName(segmentName).getPartitionId();
-    List<String> instancesAssigned = getInstances(instances, partitionId);
-    LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} 
for table: {}", segmentName, partitionId,
-        instancesAssigned, _tableNameWithType);
-    return instancesAssigned;
-  }
-
-  @Override
-  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Configuration config) {
-    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
-        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
-
-    // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment =
-        
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
-    List<String> instancesForCompletedSegments = SegmentAssignmentUtils
-        .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, 
_replication, InstancePartitionsType.COMPLETED);
-    Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
-        
.rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment, 
instancesForCompletedSegments,
-            _replication);
-
-    // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment =
-        
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
-    if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
-        RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
-      List<String> instancesForConsumingSegments = SegmentAssignmentUtils
-          .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, 
_replication,
-              InstancePartitionsType.CONSUMING);
-      for (String segmentName : consumingSegmentAssignment.keySet()) {
-        int partitionId = new LLCSegmentName(segmentName).getPartitionId();
-        List<String> instancesAssigned = 
getInstances(instancesForConsumingSegments, partitionId);
-        Map<String, String> instanceStateMap = SegmentAssignmentUtils
-            .getInstanceStateMap(instancesAssigned, 
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
-        newAssignment.put(segmentName, instanceStateMap);
-      }
-      LOGGER.info(
-          "Rebalanced {} COMPLETED segments to instances: {} and {} CONSUMING 
segments to instances: {} for table: {} with replication: {}, number of 
segments to be moved to each instances: {}",
-          completedSegmentAssignment.size(), instancesForCompletedSegments, 
consumingSegmentAssignment.size(),
-          instancesForConsumingSegments, _tableNameWithType, _replication,
-          
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
-    } else {
-      LOGGER.info(
-          "Rebalanced {} COMPLETED segments to instances: {} for table: {} 
with replication: {}, number of segments to be moved to each instance: {}",
-          completedSegmentAssignment.size(), instancesForCompletedSegments, 
_tableNameWithType, _replication,
-          
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment,
 newAssignment));
-      newAssignment.putAll(consumingSegmentAssignment);
-    }
-
-    // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the OFFLINE
-    // segments and re-assign them
-    
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
-
-    return newAssignment;
-  }
-
-  /**
-   * Returns the instances for the given partition id for CONSUMING segments.
-   * <p>Uniformly spray the partitions and replicas across the instances.
-   * <p>E.g. (6 servers, 3 partitions, 4 replicas)
-   * <pre>
-   *   "0_0": [i0,   i1,   i2,   i3,   i4,   i5  ]
-   *           p0r0, p0r1, p0r2, p1r3, p1r0, p1r1
-   *           p1r2, p1r3, p2r0, p2r1, p2r2, p2r3
-   * </pre>
-   */
-  private List<String> getInstances(List<String> instances, int partitionId) {
-    List<String> instancesAssigned = new ArrayList<>(_replication);
-    for (int replicaId = 0; replicaId < _replication; replicaId++) {
-      instancesAssigned.add(instances.get((partitionId * _replication + 
replicaId) % instances.size()));
-    }
-    return instancesAssigned;
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
deleted file mode 100644
index 05febc8..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
-import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
-import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Segment assignment strategy for LLC real-time segments (both consuming and 
completed).
- * <p>It is very similar to replica-group based segment assignment with the 
following differences:
- * <ul>
- *   <li>1. Inside one replica, each partition (stream partition) always 
contains one server</li>
- *   <li>
- *     2. Partition id for an instance is derived from the index of the 
instance in the replica group, instead of
- *     explicitly stored in the instance partitions
- *   </li>
- *   <li>3. In addition to the ONLINE segments, there are also CONSUMING 
segments to be assigned</li>
- * </ul>
- * <p>
- *   Since each partition contains only one server (in one replica), we can 
directly assign or rebalance segments to the
- *   servers based on the partition id.
- * <p>
- *   The real-time segment assignment does not minimize segment moves because 
the server is fixed for each partition in
- *   each replica. The instance assignment is responsible for keeping minimum 
changes to the instance partitions to
- *   reduce the number of segments need to be moved.
- */
-public class RealtimeReplicaGroupSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeReplicaGroupSegmentAssignmentStrategy.class);
-
-  private HelixManager _helixManager;
-  private TableConfig _tableConfig;
-  private String _tableNameWithType;
-
-  @Override
-  public void init(HelixManager helixManager, TableConfig tableConfig) {
-    _helixManager = helixManager;
-    _tableConfig = tableConfig;
-    _tableNameWithType = tableConfig.getTableName();
-
-    LOGGER.info("Initialized RealtimeReplicaGroupSegmentAssignmentStrategy for 
table: {}", _tableNameWithType);
-  }
-
-  @Override
-  public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment) {
-    InstancePartitions instancePartitions = InstancePartitionsUtils
-        .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, 
InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
-        "The instance partitions: %s should contain only 1 partition", 
instancePartitions.getName());
-
-    int partitionId = new LLCSegmentName(segmentName).getPartitionId();
-    List<String> instancesAssigned = getInstances(instancePartitions, 
partitionId);
-    LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} 
for table: {}", segmentName, partitionId,
-        instancesAssigned, _tableNameWithType);
-    return instancesAssigned;
-  }
-
-  @Override
-  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Configuration config) {
-    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
-        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
-
-    // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment =
-        
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
-    InstancePartitions instancePartitionsForCompletedSegments = 
InstancePartitionsUtils
-        .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, 
InstancePartitionsType.COMPLETED);
-    Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
-    for (String segmentName : completedSegmentAssignment.keySet()) {
-      int partitionId = new LLCSegmentName(segmentName).getPartitionId();
-      partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new 
HashSet<>()).add(segmentName);
-    }
-    Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(completedSegmentAssignment, 
instancePartitionsForCompletedSegments,
-            partitionIdToSegmentsMap);
-
-    // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment =
-        
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
-    if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
-        RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
-      InstancePartitions instancePartitionsForConsumingSegments = 
InstancePartitionsUtils
-          .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, 
InstancePartitionsType.CONSUMING);
-      for (String segmentName : consumingSegmentAssignment.keySet()) {
-        int partitionId = new LLCSegmentName(segmentName).getPartitionId();
-        List<String> instancesAssigned = 
getInstances(instancePartitionsForConsumingSegments, partitionId);
-        Map<String, String> instanceStateMap = 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
-            
CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
-        newAssignment.put(segmentName, instanceStateMap);
-      }
-      LOGGER.info(
-          "Rebalanced {} COMPLETED segments with instance partitions: {} and 
{} CONSUMING segments with instance partitions: {} for table: {}, number of 
segments to be moved to each instances: {}",
-          completedSegmentAssignment.size(), 
instancePartitionsForCompletedSegments.getPartitionToInstancesMap(),
-          consumingSegmentAssignment.size(), 
instancePartitionsForConsumingSegments.getPartitionToInstancesMap(),
-          _tableNameWithType,
-          
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
-    } else {
-      LOGGER.info(
-          "Rebalanced {} COMPLETED segments with instance partitions: {} for 
table: {}, number of segments to be moved to each instance: {}",
-          completedSegmentAssignment.size(), 
instancePartitionsForCompletedSegments.getPartitionToInstancesMap(),
-          _tableNameWithType,
-          
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment,
 newAssignment));
-      newAssignment.putAll(consumingSegmentAssignment);
-    }
-
-    // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the OFFLINE
-    // segments and re-assign them
-    
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
-
-    return newAssignment;
-  }
-
-  /**
-   * Returns the instances for the given partition id for CONSUMING segments.
-   * <p>Within a replica, uniformly spray the partitions across the instances.
-   * <p>E.g. (within a replica, 3 servers, 6 partitions)
-   * <pre>
-   *   "0_0": [i0, i1, i2]
-   *           p0  p1  p2
-   *           p3  p4  p5
-   * </pre>
-   */
-  private List<String> getInstances(InstancePartitions instancePartitions, int 
partitionId) {
-    int numReplicas = instancePartitions.getNumReplicas();
-    List<String> instancesAssigned = new ArrayList<>(numReplicas);
-    for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
-      List<String> instances = instancePartitions.getInstances(0, replicaId);
-      instancesAssigned.add(instances.get(partitionId % instances.size()));
-    }
-    return instancesAssigned;
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
new file mode 100644
index 0000000..ab279fb
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TableConfig;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Segment assignment for LLC real-time table.
+ * <ul>
+ *   <li>
+ *     For the CONSUMING segments, it is very similar to replica-group based 
segment assignment with the following
+ *     differences:
+ *     <ul>
+ *       <li>
+ *         1. Within a replica, all segments of the same partition (steam 
partition) are always assigned to exactly one
+ *         server, and because of that we can directly assign or rebalance the 
CONSUMING segments to the servers based
+ *         on the partition id
+ *       </li>
+ *       <li>
+ *         2. Partition id for an instance is derived from the index of the 
instance (within the replica-group for
+ *         replica-group based assignment), instead of explicitly stored in 
the instance partitions
+ *       </li>
+ *     </ul>
+ *   </li>
+ *   <li>
+ *     For the COMPLETED segments, rebalance segments the same way as 
OfflineSegmentAssignment.
+ *   </li>
+ * </ul>
+ */
+public class RealtimeSegmentAssignment implements SegmentAssignment {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentAssignment.class);
+
+  private String _realtimeTableName;
+  private int _replication;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    _realtimeTableName = tableConfig.getTableName();
+    _replication = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+
+    LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} 
for table: {}", _replication,
+        _realtimeTableName);
+  }
+
+  @Override
+  public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+        _realtimeTableName);
+    Preconditions
+        .checkState(instancePartitions.getNumPartitions() == 1, "Instance 
partitions: %s should contain 1 partition",
+            instancePartitions.getName());
+    LOGGER.info("Assigning segment: {} with instance partitions: {} for table: 
{}", segmentName, instancePartitions,
+        _realtimeTableName);
+
+    List<String> instancesAssigned = assignSegment(segmentName, 
instancePartitions);
+    LOGGER.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
+        _realtimeTableName);
+    return instancesAssigned;
+  }
+
+  /**
+   * Helper method to assign instances based on the segment partition id and 
instance partitions.
+   */
+  private List<String> assignSegment(String segmentName, InstancePartitions 
instancePartitions) {
+    int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+
+    if (instancePartitions.getNumReplicas() == 1) {
+      // Non-replica-group based assignment:
+      // Uniformly spray the partitions and replicas across the instances.
+      // E.g. (6 servers, 3 partitions, 4 replicas)
+      // "0_0": [i0,  i1,  i2,  i3,  i4,  i5  ]
+      //         p0r0 p0r1 p0r2 p1r3 p1r0 p1r1
+      //         p1r2 p1r3 p2r0 p2r1 p2r2 p2r3
+
+      List<String> instances =
+          
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
 _replication);
+      int numInstances = instances.size();
+      List<String> instancesAssigned = new ArrayList<>(_replication);
+      for (int replicaId = 0; replicaId < _replication; replicaId++) {
+        instancesAssigned.add(instances.get((partitionId * _replication + 
replicaId) % numInstances));
+      }
+      return instancesAssigned;
+    } else {
+      // Replica-group based assignment:
+      // Within a replica, uniformly spray the partitions across the instances.
+      // E.g. (within a replica, 3 servers, 6 partitions)
+      // "0_0": [i0, i1, i2]
+      //         p0  p1  p2
+      //         p3  p4  p5
+
+      int numReplicas = instancePartitions.getNumReplicas();
+      if (numReplicas != _replication) {
+        LOGGER.warn(
+            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
+            instancePartitions.getName(), numReplicas, _replication, 
_realtimeTableName, numReplicas);
+      }
+
+      List<String> instancesAssigned = new ArrayList<>(numReplicas);
+      for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
+        List<String> instances = instancePartitions.getInstances(0, replicaId);
+        instancesAssigned.add(instances.get(partitionId % instances.size()));
+      }
+      return instancesAssigned;
+    }
+  }
+
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
+        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
+
+    // Rebalance COMPLETED segments first
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
+    InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    Preconditions
+        .checkState(completedInstancePartitions != null, "Failed to find 
COMPLETED instance partitions for table: %s",
+            _realtimeTableName);
+    LOGGER.info("Rebalancing COMPLETED segments for table: {} with instance 
partitions: {}", _realtimeTableName,
+        completedInstancePartitions);
+
+    Map<String, Map<String, String>> newAssignment;
+    if (completedInstancePartitions.getNumReplicas() == 1) {
+      // Non-replica-group based assignment
+
+      List<String> instances = SegmentAssignmentUtils
+          
.getInstancesForNonReplicaGroupBasedAssignment(completedInstancePartitions, 
_replication);
+      newAssignment = SegmentAssignmentUtils
+          
.rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment, 
instances, _replication);
+    } else {
+      // Replica-group based assignment
+
+      int numReplicas = completedInstancePartitions.getNumReplicas();
+      if (numReplicas != _replication) {
+        LOGGER.warn(
+            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
+            completedInstancePartitions.getName(), numReplicas, _replication, 
_realtimeTableName, numReplicas);
+      }
+
+      Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
+      for (String segmentName : completedSegmentAssignment.keySet()) {
+        int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+        partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new 
HashSet<>()).add(segmentName);
+      }
+      newAssignment = SegmentAssignmentUtils
+          .rebalanceReplicaGroupBasedTable(completedSegmentAssignment, 
completedInstancePartitions,
+              partitionIdToSegmentsMap);
+    }
+
+    // Rebalance CONSUMING segments if configured
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
+    if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
+        RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
+      InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+      Preconditions
+          .checkState(consumingInstancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+              _realtimeTableName);
+      Preconditions.checkState(consumingInstancePartitions.getNumPartitions() 
== 1,
+          "Instance partitions: %s should contain 1 partition", 
consumingInstancePartitions.getName());
+      LOGGER.info("Rebalancing CONSUMING segments for table: {} with instance 
partitions: {}", _realtimeTableName,
+          consumingInstancePartitions);
+
+      for (String segmentName : consumingSegmentAssignment.keySet()) {
+        List<String> instancesAssigned = assignSegment(segmentName, 
consumingInstancePartitions);
+        Map<String, String> instanceStateMap = SegmentAssignmentUtils
+            .getInstanceStateMap(instancesAssigned, 
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+        newAssignment.put(segmentName, instanceStateMap);
+      }
+    } else {
+      newAssignment.putAll(consumingSegmentAssignment);
+    }
+
+    // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the OFFLINE
+    // segments and re-assign them
+    
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
+    LOGGER.info("Rebalanced table: {}, number of segments to be moved to each 
instance: {}", _realtimeTableName,
+        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
+    return newAssignment;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
similarity index 59%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
index de2ea0d..fd613ff 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
@@ -23,15 +23,21 @@ import java.util.Map;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
 
 
 /**
- * Strategy to assign segment to instances or rebalance all segments in a 
table.
+ * Interface for segment assignment and table rebalance.
+ * <p>
+ * TODO: Add SegmentAssignmentStrategy interface and support custom segment 
assignment strategy (e.g. cost based segment
+ *       assignment). SegmentAssignmentStrategy should not be coupled with 
SegmentAssignment, and SegmentAssignment
+ *       should be able to choose the segment assignment strategy based on the 
configuration.
  */
-public interface SegmentAssignmentStrategy {
+public interface SegmentAssignment {
 
   /**
-   * Initializes the segment assignment strategy.
+   * Initializes the segment assignment.
    *
    * @param helixManager Helix manager
    * @param tableConfig Table config
@@ -39,21 +45,24 @@ public interface SegmentAssignmentStrategy {
   void init(HelixManager helixManager, TableConfig tableConfig);
 
   /**
-   * Assigns a new segment.
+   * Assigns segment to instances.
    *
    * @param segmentName Name of the segment to be assigned
    * @param currentAssignment Current segment assignment of the table (map 
from segment name to instance state map)
-   * @return List of servers to assign the segment to
+   * @param instancePartitionsMap Map from type (OFFLINE|CONSUMING|COMPLETED) 
to instance partitions
+   * @return List of instances to assign the segment to
    */
-  List<String> assignSegment(String segmentName, Map<String, Map<String, 
String>> currentAssignment);
+  List<String> assignSegment(String segmentName, Map<String, Map<String, 
String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap);
 
   /**
-   * Rebalances the segments for a table.
+   * Rebalances the segment assignment for a table.
    *
    * @param currentAssignment Current segment assignment of the table (map 
from segment name to instance state map)
+   * @param instancePartitionsMap Map from type (OFFLINE|CONSUMING|COMPLETED) 
to instance partitions
    * @param config Configuration for the rebalance
-   * @return the rebalanced assignment for the segments
+   * @return Rebalanced assignment for the segments
    */
   Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, 
String>> currentAssignment,
-      Configuration config);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config);
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
new file mode 100644
index 0000000..db531bb
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+
+
+/**
+ * Factory for the {@link SegmentAssignment}.
+ */
+public class SegmentAssignmentFactory {
+  private SegmentAssignmentFactory() {
+  }
+
+  public static SegmentAssignment getSegmentAssignment(HelixManager 
helixManager, TableConfig tableConfig) {
+    SegmentAssignment segmentAssignment;
+    if (tableConfig.getTableType() == TableType.OFFLINE) {
+      segmentAssignment = new OfflineSegmentAssignment();
+    } else {
+      segmentAssignment = new RealtimeSegmentAssignment();
+    }
+    segmentAssignment.init(helixManager, tableConfig);
+    return segmentAssignment;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
deleted file mode 100644
index 875b856..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.assignment.segment;
-
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-import 
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
-
-
-/**
- * Factory for the {@link SegmentAssignmentStrategy}.
- */
-public class SegmentAssignmentStrategyFactory {
-  private SegmentAssignmentStrategyFactory() {
-  }
-
-  public static SegmentAssignmentStrategy 
getSegmentAssignmentStrategy(HelixManager helixManager,
-      TableConfig tableConfig) {
-    SegmentAssignmentStrategy segmentAssignmentStrategy;
-    if (AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
-        
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()))
 {
-      if (tableConfig.getTableType() == TableType.OFFLINE) {
-        segmentAssignmentStrategy = new 
OfflineReplicaGroupSegmentAssignmentStrategy();
-      } else {
-        segmentAssignmentStrategy = new 
RealtimeReplicaGroupSegmentAssignmentStrategy();
-      }
-    } else {
-      if (tableConfig.getTableType() == TableType.OFFLINE) {
-        segmentAssignmentStrategy = new 
OfflineBalanceNumSegmentAssignmentStrategy();
-      } else {
-        segmentAssignmentStrategy = new 
RealtimeBalanceNumSegmentAssignmentStrategy();
-      }
-    }
-    segmentAssignmentStrategy.init(helixManager, tableConfig);
-    return segmentAssignmentStrategy;
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 0f3fe6f..38d9681 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -27,15 +27,11 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
-import org.apache.helix.HelixManager;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.pinot.common.config.TableConfig;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
-import org.apache.pinot.common.utils.InstancePartitionsType;
 import org.apache.pinot.common.utils.Pairs;
 import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
-import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
 
 
 /**
@@ -73,23 +69,23 @@ class SegmentAssignmentUtils {
   }
 
   /**
-   * Returns the instances for the balance number segment assignment strategy.
+   * Returns instances for non-replica-group based assignment.
    */
-  static List<String> getInstancesForBalanceNumStrategy(HelixManager 
helixManager, TableConfig tableConfig,
-      int replication, InstancePartitionsType instancePartitionsType) {
-    InstancePartitions instancePartitions =
-        InstancePartitionsUtils.fetchOrComputeInstancePartitions(helixManager, 
tableConfig, instancePartitionsType);
-    Preconditions.checkArgument(instancePartitions.getNumPartitions() == 1 && 
instancePartitions.getNumReplicas() == 1,
-        "The instance partitions: %s should contain only 1 partition and 1 
replica", instancePartitions.getName());
+  static List<String> 
getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions 
instancePartitions,
+      int replication) {
+    Preconditions.checkState(instancePartitions.getNumReplicas() == 1 && 
instancePartitions.getNumPartitions() == 1,
+        "Instance partitions: %s should contain 1 replica and 1 partition for 
non-replica-group based assignment",
+        instancePartitions.getName());
     List<String> instances = instancePartitions.getInstances(0, 0);
-    Preconditions.checkState(instances.size() >= replication,
-        "There are less instances: %d than the replication: %d for table: %s", 
instances.size(), replication,
-        tableConfig.getTableName());
+    int numInstances = instances.size();
+    Preconditions.checkState(numInstances >= replication,
+        "There are less instances: %s in instance partitions: %s than the 
table replication: %s", numInstances,
+        instancePartitions.getName(), replication);
     return instances;
   }
 
   /**
-   * Rebalances the table with Helix AutoRebalanceStrategy for the balance 
number segment assignment strategy.
+   * Rebalances the table with Helix AutoRebalanceStrategy.
    */
   static Map<String, Map<String, String>> 
rebalanceTableWithHelixAutoRebalanceStrategy(
       Map<String, Map<String, String>> currentAssignment, List<String> 
instances, int replication) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
index 9d6e7d7..004ff1f 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
@@ -283,7 +283,7 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
     // Post the CONSUMING instance partitions
     instancePartitionsMap = deserializeInstancePartitionsMap(
         
sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null),
-            JsonUtils.objectToString(consumingInstancePartitions)));
+            consumingInstancePartitions.toJsonString()));
     assertEquals(instancePartitionsMap.size(), 1);
     
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
         Collections.singletonList(realtimeInstanceId));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
similarity index 76%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
rename to 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
index 762773b..c455274 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
@@ -19,14 +19,11 @@
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.InstancePartitionsType;
@@ -34,16 +31,11 @@ import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 
-public class OfflineBalanceNumSegmentAssignmentStrategyTest {
+public class OfflineNonReplicaGroupSegmentAssignmentTest {
   private static final int NUM_REPLICAS = 3;
   private static final String SEGMENT_NAME_PREFIX = "segment_";
   private static final int NUM_SEGMENTS = 100;
@@ -57,33 +49,26 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest 
{
   private static final String INSTANCE_PARTITIONS_NAME =
       InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME);
 
-  private SegmentAssignmentStrategy _strategy;
+  private SegmentAssignment _segmentAssignment;
+  private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
 
   @BeforeClass
   public void setUp() {
+    TableConfig tableConfig =
+        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+    _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfig);
+
     // {
     //   0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, 
instance_5, instance_6, instance_7, instance_8, instance_9]
     // }
     InstancePartitions instancePartitions = new 
InstancePartitions(INSTANCE_PARTITIONS_NAME);
     instancePartitions.setInstances(0, 0, INSTANCES);
-
-    // Mock HelixManager
-    @SuppressWarnings("unchecked")
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    when(propertyStore
-        
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME)),
 any(),
-            anyInt())).thenReturn(instancePartitions.toZNRecord());
-    HelixManager helixManager = mock(HelixManager.class);
-    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
-    _strategy = 
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, 
tableConfig);
+    _instancePartitionsMap = 
Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions);
   }
 
   @Test
   public void testFactory() {
-    assertTrue(_strategy instanceof 
OfflineBalanceNumSegmentAssignmentStrategy);
+    assertTrue(_segmentAssignment instanceof OfflineSegmentAssignment);
   }
 
   @Test
@@ -98,7 +83,8 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest {
     // ...
     int expectedAssignedInstanceId = 0;
     for (String segmentName : SEGMENTS) {
-      List<String> instancesAssigned = _strategy.assignSegment(segmentName, 
currentAssignment);
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
         assertEquals(instancesAssigned.get(replicaId), 
INSTANCES.get(expectedAssignedInstanceId));
@@ -113,7 +99,8 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest {
   public void testTableBalanced() {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (String segmentName : SEGMENTS) {
-      List<String> instancesAssigned = _strategy.assignSegment(segmentName, 
currentAssignment);
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       currentAssignment.put(segmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentOnlineOfflineStateModel.ONLINE));
     }
@@ -132,6 +119,6 @@ public class OfflineBalanceNumSegmentAssignmentStrategyTest 
{
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(_strategy.rebalanceTable(currentAssignment, null), 
currentAssignment);
+    assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null), currentAssignment);
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
similarity index 86%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
rename to 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index 28586f0..089c016 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -51,7 +51,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 
-public class OfflineReplicaGroupSegmentAssignmentStrategyTest {
+public class OfflineReplicaGroupSegmentAssignmentTest {
   private static final int NUM_REPLICAS = 3;
   private static final String SEGMENT_NAME_PREFIX = "segment_";
   private static final int NUM_SEGMENTS = 90;
@@ -72,11 +72,20 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
   private static final String PARTITION_COLUMN = "partitionColumn";
   private static final int NUM_PARTITIONS = 3;
 
-  private SegmentAssignmentStrategy _strategyWithoutPartition;
-  private SegmentAssignmentStrategy _strategyWithPartition;
+  private SegmentAssignment _segmentAssignmentWithoutPartition;
+  private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMapWithoutPartition;
+  private SegmentAssignment _segmentAssignmentWithPartition;
+  private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMapWithPartition;
 
   @BeforeClass
   public void setUp() {
+    TableConfig tableConfigWithoutPartitions =
+        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION)
+            .setNumReplicas(NUM_REPLICAS)
+            
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
+    _segmentAssignmentWithoutPartition =
+        SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfigWithoutPartitions);
+
     // {
     //   0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, 
instance_5],
     //   0_1=[instance_6, instance_7, instance_8, instance_9, instance_10, 
instance_11],
@@ -93,54 +102,12 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
       }
       instancePartitionsWithoutPartition.setInstances(0, replicaId, 
instancesForReplica);
     }
-
-    // Mock HelixManager
-    @SuppressWarnings("unchecked")
-    ZkHelixPropertyStore<ZNRecord> propertyStoreWithoutPartitions = 
mock(ZkHelixPropertyStore.class);
-    when(propertyStoreWithoutPartitions.get(eq(ZKMetadataProvider
-        
.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION)),
 any(), anyInt()))
-        .thenReturn(instancePartitionsWithoutPartition.toZNRecord());
-    HelixManager helixManagerWithoutPartitions = mock(HelixManager.class);
-    
when(helixManagerWithoutPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithoutPartitions);
-
-    TableConfig tableConfigWithoutPartitions =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION)
-            .setNumReplicas(NUM_REPLICAS)
-            
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
-    _strategyWithoutPartition = SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(helixManagerWithoutPartitions, 
tableConfigWithoutPartitions);
-
-    // {
-    //   0_0=[instance_0, instance_1],
-    //   0_1=[instance_6, instance_7],
-    //   0_2=[instance_12, instance_13],
-    //   1_0=[instance_2, instance_3],
-    //   1_1=[instance_8, instance_9],
-    //   1_2=[instance_14, instance_15],
-    //   2_0=[instance_4, instance_5],
-    //   2_1=[instance_10, instance_11],
-    //   2_2=[instance_16, instance_17]
-    // }
-    InstancePartitions instancePartitionsWithPartition =
-        new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
-    int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS;
-    instanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
-        List<String> instancesForPartition = new 
ArrayList<>(numInstancesPerPartition);
-        for (int i = 0; i < numInstancesPerPartition; i++) {
-          instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
-        }
-        instancePartitionsWithPartition.setInstances(partitionId, replicaId, 
instancesForPartition);
-      }
-    }
+    _instancePartitionsMapWithoutPartition =
+        Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instancePartitionsWithoutPartition);
 
     // Mock HelixManager
     @SuppressWarnings("unchecked")
     ZkHelixPropertyStore<ZNRecord> propertyStoreWithPartitions = 
mock(ZkHelixPropertyStore.class);
-    when(propertyStoreWithPartitions.get(
-        
eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION)),
-        any(), 
anyInt())).thenReturn(instancePartitionsWithPartition.toZNRecord());
     List<ZNRecord> segmentZKMetadataZNRecords = new ArrayList<>(NUM_SEGMENTS);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = SEGMENTS.get(segmentId);
@@ -168,14 +135,41 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
             .setNumReplicas(NUM_REPLICAS)
             
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
     
tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(strategyConfig);
-    _strategyWithPartition = SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(helixManagerWithPartitions, 
tableConfigWithPartitions);
+    _segmentAssignmentWithPartition =
+        
SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions, 
tableConfigWithPartitions);
+
+    // {
+    //   0_0=[instance_0, instance_1],
+    //   0_1=[instance_6, instance_7],
+    //   0_2=[instance_12, instance_13],
+    //   1_0=[instance_2, instance_3],
+    //   1_1=[instance_8, instance_9],
+    //   1_2=[instance_14, instance_15],
+    //   2_0=[instance_4, instance_5],
+    //   2_1=[instance_10, instance_11],
+    //   2_2=[instance_16, instance_17]
+    // }
+    InstancePartitions instancePartitionsWithPartition =
+        new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
+    int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS;
+    instanceIdToAdd = 0;
+    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
+      for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
+        List<String> instancesForPartition = new 
ArrayList<>(numInstancesPerPartition);
+        for (int i = 0; i < numInstancesPerPartition; i++) {
+          instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
+        }
+        instancePartitionsWithPartition.setInstances(partitionId, replicaId, 
instancesForPartition);
+      }
+    }
+    _instancePartitionsMapWithPartition =
+        Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instancePartitionsWithPartition);
   }
 
   @Test
   public void testFactory() {
-    assertTrue(_strategyWithoutPartition instanceof 
OfflineReplicaGroupSegmentAssignmentStrategy);
-    assertTrue(_strategyWithPartition instanceof 
OfflineReplicaGroupSegmentAssignmentStrategy);
+    assertTrue(_segmentAssignmentWithoutPartition instanceof 
OfflineSegmentAssignment);
+    assertTrue(_segmentAssignmentWithPartition instanceof 
OfflineSegmentAssignment);
   }
 
   @Test
@@ -184,7 +178,8 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = SEGMENTS.get(segmentId);
-      List<String> instancesAssigned = 
_strategyWithoutPartition.assignSegment(segmentName, currentAssignment);
+      List<String> instancesAssigned = _segmentAssignmentWithoutPartition
+          .assignSegment(segmentName, currentAssignment, 
_instancePartitionsMapWithoutPartition);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
 
@@ -212,7 +207,8 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
     int numInstancesPerPartition = numInstancesPerReplica / NUM_PARTITIONS;
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = SEGMENTS.get(segmentId);
-      List<String> instancesAssigned = 
_strategyWithPartition.assignSegment(segmentName, currentAssignment);
+      List<String> instancesAssigned = _segmentAssignmentWithPartition
+          .assignSegment(segmentName, currentAssignment, 
_instancePartitionsMapWithPartition);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       int partitionId = segmentId % NUM_PARTITIONS;
       for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
@@ -240,7 +236,8 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
   public void testTableBalancedWithoutPartition() {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (String segmentName : SEGMENTS) {
-      List<String> instancesAssigned = 
_strategyWithoutPartition.assignSegment(segmentName, currentAssignment);
+      List<String> instancesAssigned = _segmentAssignmentWithoutPartition
+          .assignSegment(segmentName, currentAssignment, 
_instancePartitionsMapWithoutPartition);
       currentAssignment.put(segmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentOnlineOfflineStateModel.ONLINE));
     }
@@ -259,14 +256,16 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(_strategyWithoutPartition.rebalanceTable(currentAssignment, 
null), currentAssignment);
+    assertEquals(_segmentAssignmentWithoutPartition
+        .rebalanceTable(currentAssignment, 
_instancePartitionsMapWithoutPartition, null), currentAssignment);
   }
 
   @Test
   public void testTableBalancedWithPartition() {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (String segmentName : SEGMENTS) {
-      List<String> instancesAssigned = 
_strategyWithPartition.assignSegment(segmentName, currentAssignment);
+      List<String> instancesAssigned = _segmentAssignmentWithPartition
+          .assignSegment(segmentName, currentAssignment, 
_instancePartitionsMapWithPartition);
       currentAssignment.put(segmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentOnlineOfflineStateModel.ONLINE));
     }
@@ -285,6 +284,8 @@ public class 
OfflineReplicaGroupSegmentAssignmentStrategyTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(_strategyWithPartition.rebalanceTable(currentAssignment, 
null), currentAssignment);
+    assertEquals(
+        _segmentAssignmentWithPartition.rebalanceTable(currentAssignment, 
_instancePartitionsMapWithPartition, null),
+        currentAssignment);
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
similarity index 80%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
rename to 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 0aa454c..5fee01a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -23,11 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.CommonConstants;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.InstancePartitionsType;
@@ -37,16 +33,11 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConst
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 
-public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
+public class RealtimeNonReplicaGroupSegmentAssignmentTest {
   private static final int NUM_REPLICAS = 3;
   private static final int NUM_PARTITIONS = 4;
   private static final int NUM_SEGMENTS = 100;
@@ -65,7 +56,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
       
InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME);
 
   private List<String> _segments;
-  private SegmentAssignmentStrategy _strategy;
+  private SegmentAssignment _segmentAssignment;
+  private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
 
   @BeforeClass
   public void setUp() {
@@ -75,7 +67,13 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest 
{
           System.currentTimeMillis()).getSegmentName());
     }
 
-    // Consuming instances:
+    TableConfig tableConfig =
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            .setNumReplicas(NUM_REPLICAS).setLLC(true).build();
+    _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfig);
+
+    _instancePartitionsMap = new TreeMap<>();
+    // CONSUMING instances:
     // {
     //   0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, 
instance_5, instance_6, instance_7, instance_8]
     // }
@@ -83,35 +81,20 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
     //        p3r0        p3r1        p3r2
     InstancePartitions consumingInstancePartitions = new 
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
     consumingInstancePartitions.setInstances(0, 0, CONSUMING_INSTANCES);
+    _instancePartitionsMap.put(InstancePartitionsType.CONSUMING, 
consumingInstancePartitions);
 
-    // Completed instances:
+    // COMPLETED instances:
     // {
     //   0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, 
instance_5, instance_6, instance_7, instance_8, instance_9]
     // }
     InstancePartitions completedInstancePartitions = new 
InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME);
     completedInstancePartitions.setInstances(0, 0, COMPLETED_INSTANCES);
-
-    // Mock HelixManager
-    @SuppressWarnings("unchecked")
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    when(propertyStore
-        
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)),
-            any(), 
anyInt())).thenReturn(consumingInstancePartitions.toZNRecord());
-    when(propertyStore
-        
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)),
-            any(), 
anyInt())).thenReturn(completedInstancePartitions.toZNRecord());
-    HelixManager helixManager = mock(HelixManager.class);
-    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-            .setNumReplicas(NUM_REPLICAS).setLLC(true).build();
-    _strategy = 
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, 
tableConfig);
+    _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, 
completedInstancePartitions);
   }
 
   @Test
   public void testFactory() {
-    assertTrue(_strategy instanceof 
RealtimeBalanceNumSegmentAssignmentStrategy);
+    assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
   }
 
   @Test
@@ -119,7 +102,8 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
-      List<String> instancesAssigned = _strategy.assignSegment(segmentName, 
currentAssignment);
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
 
@@ -143,7 +127,8 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
-      List<String> instancesAssigned = _strategy.assignSegment(segmentName, 
currentAssignment);
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
 
@@ -154,20 +139,20 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
       assertEquals(instanceStateMap.size(), NUM_REPLICAS);
     }
 
-    // Rebalance should relocate all completed (ONLINE) segments to the 
completed instances
+    // Rebalance should relocate all COMPLETED (ONLINE) segments to the 
COMPLETED instances
     Map<String, Map<String, String>> newAssignment =
-        _strategy.rebalanceTable(currentAssignment, new BaseConfiguration());
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, new BaseConfiguration());
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
-        // Completed (ONLINE) segments
+        // COMPLETED (ONLINE) segments
         Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
           assertEquals(entry.getValue(), 
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
         }
       } else {
-        // Consuming (CONSUMING) segments
+        // CONSUMING segments
         Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
@@ -184,10 +169,10 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
       assertTrue(numSegmentsAssignedPerInstance[i] >= 
expectedMinNumSegmentsPerInstance);
     }
 
-    // Rebalance all segments (both completed and consuming) should give the 
same assignment
+    // Rebalance all segments (including CONSUMING) should give the same 
assignment
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
-    assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
+    assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, config), newAssignment);
 
     // Rebalance should not change the assignment for the OFFLINE segments
     String offlineSegmentName = "offlineSegment";
@@ -196,7 +181,7 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
             RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
     currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
     newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
-    assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
+    assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, config), newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
similarity index 82%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
rename to 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index f74a7b2..04fc344 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -24,11 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import 
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
@@ -39,16 +35,11 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConst
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 
-public class RealtimeReplicaGroupSegmentAssignmentStrategyTest {
+public class RealtimeReplicaGroupSegmentAssignmentTest {
   private static final int NUM_REPLICAS = 3;
   private static final int NUM_PARTITIONS = 4;
   private static final int NUM_SEGMENTS = 100;
@@ -67,7 +58,8 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
       
InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME);
 
   private List<String> _segments;
-  private SegmentAssignmentStrategy _strategy;
+  private SegmentAssignment _segmentAssignment;
+  private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
 
   @BeforeClass
   public void setUp() {
@@ -77,7 +69,14 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
           System.currentTimeMillis()).getSegmentName());
     }
 
-    // Consuming instances:
+    TableConfig tableConfig =
+        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+            
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+            .build();
+    _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfig);
+
+    _instancePartitionsMap = new TreeMap<>();
+    // CONSUMING instances:
     // {
     //   0_0=[instance_0, instance_1, instance_2],
     //   0_1=[instance_3, instance_4, instance_5],
@@ -95,8 +94,9 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
       }
       consumingInstancePartitions.setInstances(0, replicaId, 
consumingInstancesForReplica);
     }
+    _instancePartitionsMap.put(InstancePartitionsType.CONSUMING, 
consumingInstancePartitions);
 
-    // Completed instances:
+    // COMPLETED instances:
     // {
     //   0_0=[instance_0, instance_1, instance_2, instance_3],
     //   0_1=[instance_4, instance_5, instance_6, instance_7],
@@ -112,29 +112,12 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
       }
       completedInstancePartitions.setInstances(0, replicaId, 
completedInstancesForReplica);
     }
-
-    // Mock HelixManager
-    @SuppressWarnings("unchecked")
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    when(propertyStore
-        
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)),
-            any(), 
anyInt())).thenReturn(consumingInstancePartitions.toZNRecord());
-    when(propertyStore
-        
.get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)),
-            any(), 
anyInt())).thenReturn(completedInstancePartitions.toZNRecord());
-    HelixManager helixManager = mock(HelixManager.class);
-    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
-            .build();
-    _strategy = 
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, 
tableConfig);
+    _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, 
completedInstancePartitions);
   }
 
   @Test
   public void testFactory() {
-    assertTrue(_strategy instanceof 
RealtimeReplicaGroupSegmentAssignmentStrategy);
+    assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
   }
 
   @Test
@@ -143,7 +126,8 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
-      List<String> instancesAssigned = _strategy.assignSegment(segmentName, 
currentAssignment);
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
 
@@ -167,7 +151,8 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
-      List<String> instancesAssigned = _strategy.assignSegment(segmentName, 
currentAssignment);
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
 
@@ -178,20 +163,20 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
       assertEquals(instanceStateMap.size(), NUM_REPLICAS);
     }
 
-    // Rebalance should relocate all completed (ONLINE) segments to the 
completed instances
+    // Rebalance should relocate all COMPLETED (ONLINE) segments to the 
COMPLETED instances
     Map<String, Map<String, String>> newAssignment =
-        _strategy.rebalanceTable(currentAssignment, new BaseConfiguration());
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, new BaseConfiguration());
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
-        // Completed (ONLINE) segments
+        // COMPLETED (ONLINE) segments
         Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
           assertEquals(entry.getValue(), 
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
         }
       } else {
-        // Consuming (CONSUMING) segments
+        // CONSUMING segments
         Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
@@ -207,10 +192,10 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
 
-    // Rebalance all segments (both completed and consuming) should give the 
same assignment
+    // Rebalance all segments (including CONSUMING) should give the same 
assignment
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
-    assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
+    assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, config), newAssignment);
 
     // Rebalance should not change the assignment for the OFFLINE segments
     String offlineSegmentName = "offlineSegment";
@@ -219,7 +204,7 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
             RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
     currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
     newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
-    assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
+    assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, config), newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to