This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e41bc3  [Instance Assignment] Plug in instance assignment (#4516)
4e41bc3 is described below

commit 4e41bc36b156b8f1d54619eb79fe161eed2c42c9
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Aug 15 17:29:41 2019 -0700

    [Instance Assignment] Plug in instance assignment (#4516)
    
    - Add instance assignment APIs including:
      - GET/PUT/DELETE instance partitions
      - Assign instances for a table
      - Replace instance in instance partitions
    - Assign instances if necessary while creating/updating table config
    - Backward-compatible, no change to the cluster if instance assignment is 
not configured
---
 .../instance/InstanceAssignmentConfigUtils.java    |  84 ++---
 .../pinot/common/metadata/ZKMetadataProvider.java  |   8 -
 .../PinotInstanceAssignmentRestletResource.java    | 301 ++++++++++++++++++
 .../helix/ControllerRequestURLBuilder.java         |  38 ++-
 .../helix/core/PinotHelixResourceManager.java      | 183 +++++------
 .../helix/core/assignment/InstancePartitions.java  |   8 +-
 .../core/assignment/InstancePartitionsUtils.java   |  56 +++-
 .../instance/InstanceAssignmentDriver.java         |   6 +-
 .../instance/InstanceReplicaPartitionSelector.java |   7 +-
 ...PinotInstanceAssignmentRestletResourceTest.java | 339 +++++++++++++++++++++
 .../instance/InstanceAssignmentTest.java           |  93 +-----
 .../ReplicaGroupRebalanceStrategyTest.java         |   4 +-
 .../sharding/SegmentAssignmentStrategyTest.java    |  72 -----
 .../apache/pinot/core/util/ReplicationUtils.java   |  35 ---
 14 files changed, 902 insertions(+), 332 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
index f094527..446f107 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
@@ -33,8 +33,40 @@ public class InstanceAssignmentConfigUtils {
   private InstanceAssignmentConfigUtils() {
   }
 
+  /**
+   * Returns whether the instance assignment is allowed for the given table 
config.
+   */
+  public static boolean allowInstanceAssignment(TableConfig tableConfig,
+      InstancePartitionsType instancePartitionsType) {
+    TableType tableType = tableConfig.getTableType();
+    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
+        tableConfig.getInstanceAssignmentConfigMap();
+    switch (instancePartitionsType) {
+      // Allow OFFLINE instance assignment if the OFFLINE table has it 
configured or (for backward-compatibility) is
+      // using replica-group segment assignment
+      case OFFLINE:
+        return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap 
!= null && instanceAssignmentConfigMap
+            .containsKey(InstancePartitionsType.OFFLINE))
+            || AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
+            
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
+      // Allow CONSUMING/COMPLETED instance assignment if the REALTIME table 
has it configured
+      case CONSUMING:
+      case COMPLETED:
+        return tableType == TableType.REALTIME && (instanceAssignmentConfigMap 
!= null && instanceAssignmentConfigMap
+            .containsKey(instancePartitionsType));
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  /**
+   * Extracts or generates default instance assignment config from the given 
table config.
+   */
   public static InstanceAssignmentConfig 
getInstanceAssignmentConfig(TableConfig tableConfig,
       InstancePartitionsType instancePartitionsType) {
+    Preconditions.checkState(allowInstanceAssignment(tableConfig, 
instancePartitionsType),
+        "Instance assignment is not allowed for the given table config");
+
     // Use the instance assignment config from the table config if it exists
     Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
         tableConfig.getInstanceAssignmentConfigMap();
@@ -42,49 +74,33 @@ public class InstanceAssignmentConfigUtils {
       if (instanceAssignmentConfigMap.containsKey(instancePartitionsType)) {
         return instanceAssignmentConfigMap.get(instancePartitionsType);
       }
-      // Use the CONSUMING config for COMPLETED segments if COMPLETED config 
does not exist
-      if (instancePartitionsType == InstancePartitionsType.COMPLETED && 
instanceAssignmentConfigMap
-          .containsKey(InstancePartitionsType.CONSUMING)) {
-        return 
instanceAssignmentConfigMap.get(InstancePartitionsType.CONSUMING);
-      }
     }
 
     // Generate default instance assignment config if it does not exist
+    // Only allow default config for OFFLINE table with replica-group segment 
assignment for backward-compatibility
     InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig();
 
     InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig();
-    String serverTag =
-        
TagNameUtils.getServerTagFromTableConfigAndInstancePartitionsType(tableConfig, 
instancePartitionsType);
-    tagPoolConfig.setTag(serverTag);
+    
tagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(tableConfig.getTenantConfig().getServer()));
     instanceAssignmentConfig.setTagPoolConfig(tagPoolConfig);
 
     InstanceReplicaPartitionConfig replicaPartitionConfig = new 
InstanceReplicaPartitionConfig();
-    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
-    if (AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
-        .equalsIgnoreCase(validationConfig.getSegmentAssignmentStrategy())) {
-      replicaPartitionConfig.setReplicaGroupBased(true);
-      if (tableConfig.getTableType() == TableType.OFFLINE) {
-        
replicaPartitionConfig.setNumReplicas(validationConfig.getReplicationNumber());
-        ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
-            tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
-        Preconditions
-            .checkState(replicaGroupStrategyConfig != null, "Failed to find 
the replica-group strategy config");
-        String partitionColumn = 
replicaGroupStrategyConfig.getPartitionColumn();
-        if (partitionColumn != null) {
-          int numPartitions =
-              
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getNumPartitions(partitionColumn);
-          Preconditions.checkState(numPartitions > 0, "Number of partitions 
for column: %s is not properly configured",
-              partitionColumn);
-          replicaPartitionConfig.setNumPartitions(numPartitions);
-          
replicaPartitionConfig.setNumInstancesPerPartition(replicaGroupStrategyConfig.getNumInstancesPerPartition());
-        } else {
-          // If partition column is not configured, use 
replicaGroupStrategyConfig.getNumInstancesPerPartition() as
-          // number of instances per replica for backward-compatibility
-          
replicaPartitionConfig.setNumInstancesPerReplica(replicaGroupStrategyConfig.getNumInstancesPerPartition());
-        }
-      } else {
-        
replicaPartitionConfig.setNumReplicas(validationConfig.getReplicasPerPartitionNumber());
-      }
+    replicaPartitionConfig.setReplicaGroupBased(true);
+    SegmentsValidationAndRetentionConfig segmentConfig = 
tableConfig.getValidationConfig();
+    
replicaPartitionConfig.setNumReplicas(segmentConfig.getReplicationNumber());
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig = 
segmentConfig.getReplicaGroupStrategyConfig();
+    Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to 
find the replica-group strategy config");
+    String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
+    if (partitionColumn != null) {
+      int numPartitions = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getNumPartitions(partitionColumn);
+      Preconditions.checkState(numPartitions > 0, "Number of partitions for 
column: %s is not properly configured",
+          partitionColumn);
+      replicaPartitionConfig.setNumPartitions(numPartitions);
+      
replicaPartitionConfig.setNumInstancesPerPartition(replicaGroupStrategyConfig.getNumInstancesPerPartition());
+    } else {
+      // If partition column is not configured, use 
replicaGroupStrategyConfig.getNumInstancesPerPartition() as
+      // number of instances per replica for backward-compatibility
+      
replicaPartitionConfig.setNumInstancesPerReplica(replicaGroupStrategyConfig.getNumInstancesPerPartition());
     }
     instanceAssignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig);
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 2a3ad60..f8baf67 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -136,14 +136,6 @@ public class ZKMetadataProvider {
     }
   }
 
-  public static void 
removeInstancePartitionAssignmentFromPropertyStore(ZkHelixPropertyStore<ZNRecord>
 propertyStore,
-      String tableNameWithType) {
-    String propertyStorePath = 
constructPropertyStorePathForInstancePartitions(tableNameWithType);
-    if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
-      propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
-    }
-  }
-
   public static boolean 
setOfflineSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
       String offlineTableName, OfflineSegmentZKMetadata 
offlineSegmentZKMetadata, int expectedVersion) {
     // NOTE: Helix will throw ZkBadVersionException if version does not match
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
new file mode 100644
index 0000000..9f8d48f
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
+import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotInstanceAssignmentRestletResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotInstanceAssignmentRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _resourceManager;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/instancePartitions")
+  @ApiOperation(value = "Get the instance partitions")
+  public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+      @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable InstancePartitionsType instancePartitionsType) {
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
+
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType != TableType.REALTIME) {
+      if (instancePartitionsType == InstancePartitionsType.OFFLINE || 
instancePartitionsType == null) {
+        InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
+            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                
InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
+        if (offlineInstancePartitions != null) {
+          instancePartitionsMap.put(InstancePartitionsType.OFFLINE, 
offlineInstancePartitions);
+        }
+      }
+    }
+    if (tableType != TableType.OFFLINE) {
+      if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
+        InstancePartitions consumingInstancePartitions = 
InstancePartitionsUtils
+            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
+        if (consumingInstancePartitions != null) {
+          instancePartitionsMap.put(InstancePartitionsType.CONSUMING, 
consumingInstancePartitions);
+        }
+      }
+      if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
+        InstancePartitions completedInstancePartitions = 
InstancePartitionsUtils
+            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
+        if (completedInstancePartitions != null) {
+          instancePartitionsMap.put(InstancePartitionsType.COMPLETED, 
completedInstancePartitions);
+        }
+      }
+    }
+
+    if (instancePartitionsMap.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find the 
instance partitions",
+          Response.Status.NOT_FOUND);
+    } else {
+      return instancePartitionsMap;
+    }
+  }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/assignInstances")
+  @ApiOperation(value = "Assign server instances to a table")
+  public Map<InstancePartitionsType, InstancePartitions> assignInstances(
+      @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable InstancePartitionsType instancePartitionsType,
+      @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") 
@QueryParam("dryRun") boolean dryRun) {
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
+    List<InstanceConfig> instanceConfigs = 
_resourceManager.getAllHelixInstanceConfigs();
+
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType != TableType.REALTIME && (instancePartitionsType == 
InstancePartitionsType.OFFLINE
+        || instancePartitionsType == null)) {
+      TableConfig offlineTableConfig = 
_resourceManager.getOfflineTableConfig(tableName);
+      if (offlineTableConfig != null) {
+        try {
+          if (InstanceAssignmentConfigUtils
+              .allowInstanceAssignment(offlineTableConfig, 
InstancePartitionsType.OFFLINE)) {
+            instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new 
InstanceAssignmentDriver(offlineTableConfig)
+                .assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs));
+          }
+        } catch (IllegalStateException e) {
+          throw new ControllerApplicationException(LOGGER, "Caught 
IllegalStateException", Response.Status.BAD_REQUEST,
+              e);
+        } catch (Exception e) {
+          throw new ControllerApplicationException(LOGGER, "Caught exception 
while calculating the instance partitions",
+              Response.Status.INTERNAL_SERVER_ERROR, e);
+        }
+      }
+    }
+    if (tableType != TableType.OFFLINE && instancePartitionsType != 
InstancePartitionsType.OFFLINE) {
+      TableConfig realtimeTableConfig = 
_resourceManager.getRealtimeTableConfig(tableName);
+      if (realtimeTableConfig != null) {
+        try {
+          InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(realtimeTableConfig);
+          if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
+            if (InstanceAssignmentConfigUtils
+                .allowInstanceAssignment(realtimeTableConfig, 
InstancePartitionsType.CONSUMING)) {
+              instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
+                  
instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, 
instanceConfigs));
+            }
+          }
+          if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
+            if (InstanceAssignmentConfigUtils
+                .allowInstanceAssignment(realtimeTableConfig, 
InstancePartitionsType.COMPLETED)) {
+              instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
+                  
instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, 
instanceConfigs));
+            }
+          }
+        } catch (IllegalStateException e) {
+          throw new ControllerApplicationException(LOGGER, "Caught 
IllegalStateException", Response.Status.BAD_REQUEST,
+              e);
+        } catch (Exception e) {
+          throw new ControllerApplicationException(LOGGER, "Caught exception 
while calculating the instance partitions",
+              Response.Status.INTERNAL_SERVER_ERROR, e);
+        }
+      }
+    }
+
+    if (instancePartitionsMap.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find the 
instance assignment config",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (!dryRun) {
+      for (InstancePartitions instancePartitions : 
instancePartitionsMap.values()) {
+        persistInstancePartitionsHelper(instancePartitions);
+      }
+    }
+
+    return instancePartitionsMap;
+  }
+
+  private void persistInstancePartitionsHelper(InstancePartitions 
instancePartitions) {
+    try {
+      LOGGER.info("Persisting instance partitions: {} to the property store", 
instancePartitions.getName());
+      
InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(),
 instancePartitions);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Caught Exception while 
persisting the instance partitions",
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @PUT
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/instancePartitions")
+  @ApiOperation(value = "Create/update the instance partitions")
+  public Map<InstancePartitionsType, InstancePartitions> setInstancePartitions(
+      @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName, String instancePartitionsStr) {
+    InstancePartitions instancePartitions;
+    try {
+      instancePartitions = JsonUtils.stringToObject(instancePartitionsStr, 
InstancePartitions.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to deserialize 
the instance partitions",
+          Response.Status.BAD_REQUEST);
+    }
+
+    String instancePartitionsName = instancePartitions.getName();
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType != TableType.REALTIME) {
+      if 
(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
 {
+        persistInstancePartitionsHelper(instancePartitions);
+        return Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instancePartitions);
+      }
+    }
+    if (tableType != TableType.OFFLINE) {
+      if 
(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
 {
+        persistInstancePartitionsHelper(instancePartitions);
+        return Collections.singletonMap(InstancePartitionsType.CONSUMING, 
instancePartitions);
+      }
+      if 
(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
 {
+        persistInstancePartitionsHelper(instancePartitions);
+        return Collections.singletonMap(InstancePartitionsType.COMPLETED, 
instancePartitions);
+      }
+    }
+
+    throw new ControllerApplicationException(LOGGER, "Instance partitions 
cannot be applied to the table",
+        Response.Status.BAD_REQUEST);
+  }
+
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/instancePartitions")
+  @ApiOperation(value = "Remove the instance partitions")
+  public SuccessResponse removeInstancePartitions(
+      @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable InstancePartitionsType instancePartitionsType) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType != TableType.REALTIME && (instancePartitionsType == 
InstancePartitionsType.OFFLINE
+        || instancePartitionsType == null)) {
+      
removeInstancePartitionsHelper(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
+    }
+    if (tableType != TableType.OFFLINE) {
+      if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
+        
removeInstancePartitionsHelper(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
+      }
+      if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
+        
removeInstancePartitionsHelper(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
+      }
+    }
+    return new SuccessResponse("Instance partitions removed");
+  }
+
+  private void removeInstancePartitionsHelper(String instancePartitionsName) {
+    try {
+      LOGGER.info("Removing instance partitions: {} from the property store", 
instancePartitionsName);
+      
InstancePartitionsUtils.removeInstancePartitions(_resourceManager.getPropertyStore(),
 instancePartitionsName);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Caught Exception while 
removing the instance partitions",
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/replaceInstance")
+  @ApiOperation(value = "Replace an instance in the instance partitions")
+  public Map<InstancePartitionsType, InstancePartitions> replaceInstance(
+      @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable InstancePartitionsType instancePartitionsType,
+      @ApiParam(value = "Old instance to be replaced", required = true) 
@QueryParam("oldInstanceId") String oldInstanceId,
+      @ApiParam(value = "New instance to replace with", required = true) 
@QueryParam("newInstanceId") String newInstanceId) {
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+        getInstancePartitions(tableName, instancePartitionsType);
+    Iterator<InstancePartitions> iterator = 
instancePartitionsMap.values().iterator();
+    while (iterator.hasNext()) {
+      InstancePartitions instancePartitions = iterator.next();
+      boolean oldInstanceFound = false;
+      Map<String, List<String>> partitionToInstancesMap = 
instancePartitions.getPartitionToInstancesMap();
+      for (List<String> instances : partitionToInstancesMap.values()) {
+        oldInstanceFound |= Collections.replaceAll(instances, oldInstanceId, 
newInstanceId);
+      }
+      if (oldInstanceFound) {
+        persistInstancePartitionsHelper(instancePartitions);
+      } else {
+        iterator.remove();
+      }
+    }
+    if (instancePartitionsMap.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find the old 
instance", Response.Status.NOT_FOUND);
+    } else {
+      return instancePartitionsMap;
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index 6dc7b40..d638392 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -18,8 +18,9 @@
  */
 package org.apache.pinot.controller.helix;
 
-import org.apache.avro.reflect.Nullable;
+import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.utils.InstancePartitionsType;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.URIUtils;
 
@@ -202,4 +203,39 @@ public class ControllerRequestURLBuilder {
   public String forSegmentListAPI(String tableName) {
     return StringUtil.join("/", _baseUrl, "segments", tableName);
   }
+
+  public String forInstancePartitions(String tableName, @Nullable 
InstancePartitionsType instancePartitionsType) {
+    String url = StringUtil.join("/", _baseUrl, "tables", tableName, 
"instancePartitions");
+    if (instancePartitionsType != null) {
+      url += "?type=" + instancePartitionsType;
+    }
+    return url;
+  }
+
+  public String forInstanceAssign(String tableName, @Nullable 
InstancePartitionsType instancePartitionsType,
+      boolean dryRun) {
+    String url = StringUtil.join("/", _baseUrl, "tables", tableName, 
"assignInstances");
+    if (instancePartitionsType != null) {
+      url += "?type=" + instancePartitionsType;
+      if (dryRun) {
+        url += "&dryRun=true";
+      }
+    } else {
+      if (dryRun) {
+        url += "?dryRun=true";
+      }
+    }
+    return url;
+  }
+
+  public String forInstanceReplace(String tableName, @Nullable 
InstancePartitionsType instancePartitionsType,
+      String oldInstanceId, String newInstanceId) {
+    String url =
+        StringUtil.join("/", _baseUrl, "tables", tableName, "replaceInstance") 
+ "?oldInstanceId=" + oldInstanceId
+            + "&newInstanceId=" + newInstanceId;
+    if (instancePartitionsType != null) {
+      url += "&type=" + instancePartitionsType;
+    }
+    return url;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d4579de..c7fb073 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -72,6 +72,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.config.Tenant;
 import org.apache.pinot.common.config.TenantConfig;
+import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.TableNotFoundException;
@@ -83,14 +84,14 @@ import 
org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
-import 
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
 import org.apache.pinot.common.restlet.resources.RebalanceResult;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import 
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
@@ -98,16 +99,17 @@ import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.resources.StateType;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
+import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
-import 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
 import 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.util.ReplicationUtils;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1024,8 +1026,9 @@ public class PinotHelixResourceManager {
     validateTableTenantConfig(tableConfig);
 
     String tableNameWithType = tableConfig.getTableName();
-    TableType tableType = tableConfig.getTableType();
     SegmentsValidationAndRetentionConfig segmentsConfig = 
tableConfig.getValidationConfig();
+
+    TableType tableType = tableConfig.getTableType();
     switch (tableType) {
       case OFFLINE:
         // existing tooling relies on this check not existing for realtime 
table (to migrate to LLC)
@@ -1040,14 +1043,16 @@ public class PinotHelixResourceManager {
                 _enableBatchMessageMode);
         LOGGER.info("adding table via the admin");
         _helixAdmin.addResource(_helixClusterName, tableNameWithType, 
offlineIdealState);
-        LOGGER.info("successfully added the table : " + tableNameWithType + " 
to the cluster");
 
         // lets add table configs
         ZKMetadataProvider.setOfflineTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
 
-        // Update replica group partition assignment to the property store if 
applicable
-        updateReplicaGroupPartitionAssignment(tableConfig);
+        // Assign instances
+        assignInstances(tableConfig, true);
+
+        LOGGER.info("Successfully added table: {}", tableNameWithType);
         break;
+
       case REALTIME:
         IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
         verifyIndexingConfig(tableNameWithType, indexingConfig);
@@ -1067,11 +1072,6 @@ public class PinotHelixResourceManager {
         // lets add table configs
         ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
 
-        // Update replica group partition assignment to the property store if 
applicable
-        if (ReplicationUtils.setupRealtimeReplicaGroups(tableConfig)) {
-          updateReplicaGroupPartitionAssignment(tableConfig);
-        }
-
         /*
          * PinotRealtimeSegmentManager sets up watches on table and segment 
path. When a table gets created,
          * it expects the INSTANCE path in propertystore to be set up so that 
it can get the group ID and
@@ -1086,10 +1086,14 @@ public class PinotHelixResourceManager {
          */
         ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, 
indexingConfig);
 
+        // Assign instances
+        assignInstances(tableConfig, true);
+
         LOGGER.info("Successfully added or updated the table {} ", 
tableNameWithType);
         break;
+
       default:
-        throw new InvalidTableConfigException("UnSupported table type: " + 
tableType);
+        throw new InvalidTableConfigException("Unsupported table type: " + 
tableType);
     }
 
     String brokerTenantName = 
TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
@@ -1132,43 +1136,6 @@ public class PinotHelixResourceManager {
     }
   }
 
-  /**
-   * Update replica group partition assignment in the property store
-   *
-   * @param tableConfig a table config
-   */
-  private void updateReplicaGroupPartitionAssignment(TableConfig tableConfig) {
-    String tableNameWithType = tableConfig.getTableName();
-    String assignmentStrategy = 
tableConfig.getValidationConfig().getSegmentAssignmentStrategy();
-    // We create replica group partition assignment and write to property 
store if new table config
-    // has the replica group config.
-    if (assignmentStrategy != null && 
SegmentAssignmentStrategyEnum.valueOf(assignmentStrategy)
-        == 
SegmentAssignmentStrategyEnum.ReplicaGroupSegmentAssignmentStrategy) {
-      ReplicaGroupPartitionAssignmentGenerator partitionAssignmentGenerator =
-          new ReplicaGroupPartitionAssignmentGenerator(_propertyStore);
-
-      // Create the new replica group partition assignment if there is none in 
the property store.
-      // This will create the replica group partition assignment and write to 
the property store in 2 cases:
-      // 1. when we create the table with replica group segment assignment
-      // 2. when we update the table config with replica group segment 
assignment from another assignment strategy
-      if 
(partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType)
 == null) {
-        TableType tableType = tableConfig.getTableType();
-        List<String> servers;
-        if (tableType.equals(TableType.OFFLINE)) {
-          OfflineTagConfig offlineTagConfig = new 
OfflineTagConfig(tableConfig);
-          servers = 
getInstancesWithTag(offlineTagConfig.getOfflineServerTag());
-        } else {
-          RealtimeTagConfig realtimeTagConfig = new 
RealtimeTagConfig(tableConfig);
-          servers = 
getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
-        }
-        int numReplicas = ReplicationUtils.getReplication(tableConfig);
-        ReplicaGroupPartitionAssignment partitionAssignment = 
partitionAssignmentGenerator
-            .buildReplicaGroupPartitionAssignment(tableNameWithType, 
tableConfig, numReplicas, servers);
-        
partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
-      }
-    }
-  }
-
   public static class InvalidTableConfigException extends RuntimeException {
     public InvalidTableConfigException(String message) {
       super(message);
@@ -1268,6 +1235,35 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private void assignInstances(TableConfig tableConfig, boolean override)
+      throws IOException {
+    String tableNameWithType = tableConfig.getTableName();
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+
+    List<InstancePartitionsType> instancePartitionsTypesToAssign = new 
ArrayList<>();
+    for (InstancePartitionsType instancePartitionsType : 
InstancePartitionsType.values()) {
+      if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
instancePartitionsType)) {
+        if (override || InstancePartitionsUtils
+            .fetchInstancePartitions(_propertyStore, 
instancePartitionsType.getInstancePartitionsName(rawTableName))
+            == null) {
+          instancePartitionsTypesToAssign.add(instancePartitionsType);
+        }
+      }
+    }
+
+    if (!instancePartitionsTypesToAssign.isEmpty()) {
+      LOGGER.info("Assigning {} instances to table: {}", 
instancePartitionsTypesToAssign, tableNameWithType);
+      InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
+      List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
+      for (InstancePartitionsType instancePartitionsType : 
instancePartitionsTypesToAssign) {
+        InstancePartitions instancePartitions =
+            instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs);
+        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
+        LOGGER.info("Persisted instance partitions: {}", 
JsonUtils.objectToString(instancePartitions));
+      }
+    }
+  }
+
   /**
    * Validate the table config and update it
    * @throws IOException
@@ -1284,34 +1280,42 @@ public class PinotHelixResourceManager {
   public void setExistingTableConfig(TableConfig tableConfig)
       throws IOException {
     String tableNameWithType = tableConfig.getTableName();
+    SegmentsValidationAndRetentionConfig segmentsConfig = 
tableConfig.getValidationConfig();
+
     TableType tableType = tableConfig.getTableType();
-    if (tableType == TableType.REALTIME) {
-      IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
-      verifyIndexingConfig(tableNameWithType, indexingConfig);
-      // Update replica group partition assignment to the property store if 
applicable
-      if (ReplicationUtils.setupRealtimeReplicaGroups(tableConfig)) {
-        updateReplicaGroupPartitionAssignment(tableConfig);
-      }
-      ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
-      ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, 
indexingConfig);
-    } else if (tableType == TableType.OFFLINE) {
-      // Update replica group partition assignment to the property store if 
applicable
-      updateReplicaGroupPartitionAssignment(tableConfig);
-
-      ZKMetadataProvider.setOfflineTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
-      IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
-      final String configReplication = 
tableConfig.getValidationConfig().getReplication();
-      if (configReplication != null && 
!tableConfig.getValidationConfig().getReplication()
-          .equals(idealState.getReplicas())) {
-        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new 
Function<IdealState, IdealState>() {
-          @Nullable
-          @Override
-          public IdealState apply(@Nullable IdealState idealState) {
-            idealState.setReplicas(configReplication);
-            return idealState;
-          }
-        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
-      }
+    switch (tableType) {
+      case OFFLINE:
+        ZKMetadataProvider.setOfflineTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
+
+        // Update IdealState replication
+        IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+        String replicationConfigured = segmentsConfig.getReplication();
+        if (!idealState.getReplicas().equals(replicationConfigured)) {
+          HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is 
-> {
+            assert is != null;
+            is.setReplicas(replicationConfigured);
+            return is;
+          }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+        }
+
+        // Assign instances
+        assignInstances(tableConfig, false);
+
+        break;
+
+      case REALTIME:
+        IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+        verifyIndexingConfig(tableNameWithType, indexingConfig);
+        ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
+        ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, 
indexingConfig);
+
+        // Assign instances
+        assignInstances(tableConfig, false);
+
+        break;
+
+      default:
+        throw new InvalidTableConfigException("Unsupported table type: " + 
tableType);
     }
   }
 
@@ -1396,9 +1400,11 @@ public class PinotHelixResourceManager {
     ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, 
offlineTableName);
     LOGGER.info("Deleting table {}: Removed table config", offlineTableName);
 
-    // Remove replica group partition assignment
-    
ZKMetadataProvider.removeInstancePartitionAssignmentFromPropertyStore(_propertyStore,
 offlineTableName);
-    LOGGER.info("Deleting table {}: Removed replica group partition 
assignment", offlineTableName);
+    // Remove instance partitions
+    InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+        
InstancePartitionsType.OFFLINE.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableName)));
+    LOGGER.info("Deleting table {}: Removed instance partitions", 
offlineTableName);
+
     LOGGER.info("Deleting table {}: Finish", offlineTableName);
   }
 
@@ -1430,11 +1436,15 @@ public class PinotHelixResourceManager {
     ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, 
realtimeTableName);
     LOGGER.info("Deleting table {}: Removed table config", realtimeTableName);
 
-    // Remove replica group partition assignment
-    
ZKMetadataProvider.removeInstancePartitionAssignmentFromPropertyStore(_propertyStore,
 realtimeTableName);
-    LOGGER.info("Deleting table {}: Removed replica group partition 
assignment", realtimeTableName);
+    // Remove instance partitions
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+        
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
+    InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+        
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
+    LOGGER.info("Deleting table {}: Removed instance partitions", 
realtimeTableName);
 
-    // Remove groupId/PartitionId mapping for HLC table
+    // Remove groupId/partitionId mapping for HLC table
     if (instancesForTable != null) {
       for (String instance : instancesForTable) {
         InstanceZKMetadata instanceZKMetadata = 
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, instance);
@@ -1444,7 +1454,8 @@ public class PinotHelixResourceManager {
         }
       }
     }
-    LOGGER.info("Deleting table {}: Removed replica group partition 
assignment", realtimeTableName);
+    LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for 
HLC table", realtimeTableName);
+
     LOGGER.info("Deleting table {}: Finish", realtimeTableName);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
index 6d9a35f..4ed659e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.controller.helix.core.assignment;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -32,6 +35,7 @@ import org.apache.helix.ZNRecord;
  * list of server instances, and is persisted under the ZK path: {@code 
<cluster>/PROPERTYSTORE/INSTANCE_PARTITIONS}.
  * <p>The segment assignment will be based on the instance partitions of the 
table.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class InstancePartitions {
   private static final char PARTITION_REPLICA_SEPARATOR = '_';
 
@@ -46,7 +50,9 @@ public class InstancePartitions {
     _partitionToInstancesMap = new TreeMap<>();
   }
 
-  private InstancePartitions(String name, Map<String, List<String>> 
partitionToInstancesMap) {
+  @JsonCreator
+  private InstancePartitions(@JsonProperty(value = "name", required = true) 
String name,
+      @JsonProperty(value = "partitionToInstancesMap", required = true) 
Map<String, List<String>> partitionToInstancesMap) {
     _name = name;
     _partitionToInstancesMap = partitionToInstancesMap;
     for (String key : partitionToInstancesMap.keySet()) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java
index be2c99d..8e74340 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java
@@ -20,10 +20,13 @@ package org.apache.pinot.controller.helix.core.assignment;
 
 import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
@@ -45,14 +48,23 @@ public class InstancePartitionsUtils {
   public static InstancePartitions 
fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig 
tableConfig,
       InstancePartitionsType instancePartitionsType) {
     String tableNameWithType = tableConfig.getTableName();
-    String instancePartitionsName =
-        
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType));
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    String instancePartitionsName = 
instancePartitionsType.getInstancePartitionsName(rawTableName);
 
     // Fetch the instance partitions from property store if exists
-    String path = 
ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName);
-    ZNRecord znRecord = helixManager.getHelixPropertyStore().get(path, null, 
AccessOption.PERSISTENT);
-    if (znRecord != null) {
-      return InstancePartitions.fromZNRecord(znRecord);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
helixManager.getHelixPropertyStore();
+    InstancePartitions instancePartitions = 
fetchInstancePartitions(propertyStore, instancePartitionsName);
+    if (instancePartitions != null) {
+      return instancePartitions;
+    }
+
+    // Use the CONSUMING instance partitions for COMPLETED segments if exists
+    if (instancePartitionsType == InstancePartitionsType.COMPLETED) {
+      InstancePartitions consumingInstancePartitions = 
fetchInstancePartitions(propertyStore,
+          
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
+      if (consumingInstancePartitions != null) {
+        return consumingInstancePartitions;
+      }
     }
 
     // Compute the instance partitions (for backward compatible)
@@ -64,19 +76,41 @@ public class InstancePartitionsUtils {
     instances.sort(null);
     int numInstances = instances.size();
     Collections.rotate(instances, -(Math.abs(tableNameWithType.hashCode()) % 
numInstances));
-    InstancePartitions instancePartitions = new 
InstancePartitions(instancePartitionsName);
+    instancePartitions = new InstancePartitions(instancePartitionsName);
     instancePartitions.setInstances(0, 0, instances);
     return instancePartitions;
   }
 
   /**
+   * Fetches the instance partitions from Helix property store.
+   */
+  @Nullable
+  public static InstancePartitions 
fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+      String instancePartitionsName) {
+    String path = 
ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName);
+    ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+    return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
+  }
+
+  /**
    * Persists the instance partitions to Helix property store.
-   *
-   * @return true if the instance partitions was successfully persisted, false 
otherwise
    */
-  public static boolean persistInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+  public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
       InstancePartitions instancePartitions) {
     String path = 
ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitions.getName());
-    return propertyStore.set(path, instancePartitions.toZNRecord(), 
AccessOption.PERSISTENT);
+    if (!propertyStore.set(path, instancePartitions.toZNRecord(), 
AccessOption.PERSISTENT)) {
+      throw new ZkException("Failed to persist instance partitions: " + 
instancePartitions.getName());
+    }
+  }
+
+  /**
+   * Removes the instance partitions from Helix property store.
+   */
+  public static void removeInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+      String instancePartitionsName) {
+    String path = 
ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName);
+    if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+      throw new ZkException("Failed to remove instance partitions: " + 
instancePartitionsName);
+    }
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index a1fbc40..a715834 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.config.instance.InstanceConstraintConfig;
@@ -80,6 +81,9 @@ public class InstanceAssignmentDriver {
     Preconditions.checkState(replicaPartitionConfig != null, "Instance 
replica/partition config is missing");
     InstanceReplicaPartitionSelector replicaPartitionSelector =
         new InstanceReplicaPartitionSelector(replicaPartitionConfig, 
tableNameWithType);
-    return replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap);
+    InstancePartitions instancePartitions = new InstancePartitions(
+        
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+    replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
+    return instancePartitions;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
index 8f6976c..90c35fb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
@@ -51,7 +51,8 @@ public class InstanceReplicaPartitionSelector {
    * Selects instances based on the replica and partition config, and stores 
the result into the given instance
    * partitions.
    */
-  public InstancePartitions selectInstances(Map<Integer, List<InstanceConfig>> 
poolToInstanceConfigsMap) {
+  public void selectInstances(Map<Integer, List<InstanceConfig>> 
poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions) {
     int numPools = poolToInstanceConfigsMap.size();
     Preconditions.checkState(numPools != 0, "No pool qualified for selection");
 
@@ -61,8 +62,6 @@ public class InstanceReplicaPartitionSelector {
     LOGGER.info("Starting instance replica/partition selection for table: {} 
with hash: {} from pools: {}",
         _tableNameWithType, tableNameHash, pools);
 
-    InstancePartitions instancePartitions = new 
InstancePartitions(_tableNameWithType);
-
     if (_replicaPartitionConfig.isReplicaGroupBased()) {
       // Replica-group based selection
 
@@ -183,7 +182,5 @@ public class InstanceReplicaPartitionSelector {
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
-
-    return instancePartitions;
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
new file mode 100644
index 0000000..9d6e7d7
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.FileNotFoundException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.config.Tenant;
+import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
+import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
+import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TenantRole;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest 
{
+  private static final int NUM_SERVER_INSTANCES = 3;
+  private static final String TENANT_NAME = "testTenant";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    ControllerConf config = getDefaultControllerConfiguration();
+    config.setTenantIsolationEnabled(false);
+    startController(config);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, false);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, false);
+
+    // Create broker and server tenant
+    Tenant brokerTenant = new 
Tenant.TenantBuilder(TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(1).build();
+    _helixResourceManager.createBrokerTenant(brokerTenant);
+    Tenant serverTenant =
+        new 
Tenant.TenantBuilder(TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(1).setRealtimeInstances(1)
+            .build();
+    _helixResourceManager.createServerTenant(serverTenant);
+  }
+
+  @Test
+  public void testInstanceAssignment()
+      throws Exception {
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addTime(TIME_COLUMN_NAME, 
TimeUnit.DAYS, DataType.INT)
+            .build();
+    _helixResourceManager.addOrUpdateSchema(schema);
+    TableConfig offlineTableConfig =
+        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(TENANT_NAME)
+            .setServerTenant(TENANT_NAME).build();
+    _helixResourceManager.addTable(offlineTableConfig);
+    TableConfig realtimeTableConfig =
+        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(TENANT_NAME)
+            .setServerTenant(TENANT_NAME).setLLC(true)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+    _helixResourceManager.addTable(realtimeTableConfig);
+
+    // There should be no instance partitions
+    try {
+      getInstancePartitionsMap();
+      fail();
+    } catch (FileNotFoundException e) {
+      // Expected
+    }
+
+    // Assign instances should fail
+    try {
+      
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
null, true), null);
+      fail();
+    } catch (FileNotFoundException e) {
+      // Expected
+    }
+
+    // Add OFFLINE instance assignment config to the OFFLINE table config
+    InstanceAssignmentConfig offlineInstanceAssignmentConfig = new 
InstanceAssignmentConfig();
+    InstanceTagPoolConfig offlineInstanceTagPoolConfig = new 
InstanceTagPoolConfig();
+    
offlineInstanceTagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(TENANT_NAME));
+    
offlineInstanceAssignmentConfig.setTagPoolConfig(offlineInstanceTagPoolConfig);
+    offlineInstanceAssignmentConfig.setReplicaPartitionConfig(new 
InstanceReplicaPartitionConfig());
+    offlineTableConfig.setInstanceAssignmentConfigMap(
+        Collections.singletonMap(InstancePartitionsType.OFFLINE, 
offlineInstanceAssignmentConfig));
+    _helixResourceManager.setExistingTableConfig(offlineTableConfig);
+
+    // OFFLINE instance partitions should be generated
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 1);
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertNotNull(offlineInstancePartitions);
+    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
+    assertEquals(offlineInstancePartitions.getInstances(0, 0).size(), 1);
+    String offlineInstanceId = offlineInstancePartitions.getInstances(0, 
0).get(0);
+
+    // Add CONSUMING instance assignment config to the REALTIME table config
+    InstanceAssignmentConfig consumingInstanceAssignmentConfig = new 
InstanceAssignmentConfig();
+    InstanceTagPoolConfig realtimeInstanceTagPoolConfig = new 
InstanceTagPoolConfig();
+    
realtimeInstanceTagPoolConfig.setTag(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME));
+    
consumingInstanceAssignmentConfig.setTagPoolConfig(realtimeInstanceTagPoolConfig);
+    consumingInstanceAssignmentConfig.setReplicaPartitionConfig(new 
InstanceReplicaPartitionConfig());
+    realtimeTableConfig.setInstanceAssignmentConfigMap(
+        Collections.singletonMap(InstancePartitionsType.CONSUMING, 
consumingInstanceAssignmentConfig));
+    _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
+
+    // CONSUMING instance partitions should be generated
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 2);
+    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertNotNull(offlineInstancePartitions);
+    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
+    assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+    InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    assertNotNull(consumingInstancePartitions);
+    assertEquals(consumingInstancePartitions.getNumReplicas(), 1);
+    assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
+    assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1);
+    String realtimeInstanceId = consumingInstancePartitions.getInstances(0, 
0).get(0);
+
+    // Use OFFLINE instance assignment config as the COMPLETED instance 
assignment config
+    realtimeTableConfig
+        .setInstanceAssignmentConfigMap(new TreeMap<InstancePartitionsType, 
InstanceAssignmentConfig>() {{
+          put(InstancePartitionsType.CONSUMING, 
consumingInstanceAssignmentConfig);
+          put(InstancePartitionsType.COMPLETED, 
offlineInstanceAssignmentConfig);
+        }});
+    _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
+
+    // COMPLETED instance partitions should be generated
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 3);
+    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertNotNull(offlineInstancePartitions);
+    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
+    assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+    consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    assertNotNull(consumingInstancePartitions);
+    assertEquals(consumingInstancePartitions.getNumReplicas(), 1);
+    assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
+    assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(realtimeInstanceId));
+    InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    assertEquals(completedInstancePartitions.getNumReplicas(), 1);
+    assertEquals(completedInstancePartitions.getNumPartitions(), 1);
+    assertEquals(completedInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+
+    // Test fetching instance partitions by table name with type suffix
+    instancePartitionsMap = 
deserializeInstancePartitionsMap(sendGetRequest(_controllerRequestURLBuilder
+        
.forInstancePartitions(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME),
 null)));
+    assertEquals(instancePartitionsMap.size(), 1);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+    instancePartitionsMap = 
deserializeInstancePartitionsMap(sendGetRequest(_controllerRequestURLBuilder
+        
.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
 null)));
+    assertEquals(instancePartitionsMap.size(), 2);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
+    // Test fetching instance partitions by table name and instance partitions 
type
+    for (InstancePartitionsType instancePartitionsType : 
InstancePartitionsType.values()) {
+      instancePartitionsMap = deserializeInstancePartitionsMap(
+          
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 instancePartitionsType)));
+      assertEquals(instancePartitionsMap.size(), 1);
+      assertEquals(instancePartitionsMap.get(instancePartitionsType).getName(),
+          instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME));
+    }
+
+    // Remove the instance partitions for both OFFLINE and REALTIME table
+    
sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null));
+    try {
+      getInstancePartitionsMap();
+      fail();
+    } catch (FileNotFoundException e) {
+      // Expected
+    }
+
+    // Assign instances without instance partitions type (dry run)
+    instancePartitionsMap = deserializeInstancePartitionsMap(
+        
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
null, true), null));
+    assertEquals(instancePartitionsMap.size(), 3);
+    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertNotNull(offlineInstancePartitions);
+    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
+    assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+    consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    assertNotNull(consumingInstancePartitions);
+    assertEquals(consumingInstancePartitions.getNumReplicas(), 1);
+    assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
+    assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(realtimeInstanceId));
+    completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    assertEquals(completedInstancePartitions.getNumReplicas(), 1);
+    assertEquals(completedInstancePartitions.getNumPartitions(), 1);
+    assertEquals(completedInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+
+    // Instance partitions should not be persisted
+    try {
+      getInstancePartitionsMap();
+      fail();
+    } catch (FileNotFoundException e) {
+      // Expected
+    }
+
+    // Assign instances for both OFFLINE and REALTIME table
+    
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
null, false), null);
+
+    // Instance partitions should be persisted
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 3);
+
+    // Remove the instance partitions for REALTIME table
+    sendDeleteRequest(_controllerRequestURLBuilder
+        
.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
 null));
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 1);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+
+    // Assign instances for COMPLETED segments
+    instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
+        _controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
InstancePartitionsType.COMPLETED, false), null));
+    assertEquals(instancePartitionsMap.size(), 1);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
+    // There should be OFFLINE and COMPLETED instance partitions persisted
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 2);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
+    // Replace OFFLINE instance with REALTIME instance for COMPLETED instance 
partitions
+    instancePartitionsMap = 
deserializeInstancePartitionsMap(sendPostRequest(_controllerRequestURLBuilder
+            .forInstanceReplace(RAW_TABLE_NAME, 
InstancePartitionsType.COMPLETED, offlineInstanceId, realtimeInstanceId),
+        null));
+    assertEquals(instancePartitionsMap.size(), 1);
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
 0),
+        Collections.singletonList(realtimeInstanceId));
+
+    // Replace the instance again using REALTIME table name (old instance does 
not exist)
+    try {
+      sendPostRequest(_controllerRequestURLBuilder
+          
.forInstanceReplace(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
 null, offlineInstanceId,
+              realtimeInstanceId), null);
+      fail();
+    } catch (FileNotFoundException e) {
+      // Expected
+    }
+
+    // Post the CONSUMING instance partitions
+    instancePartitionsMap = deserializeInstancePartitionsMap(
+        
sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null),
+            JsonUtils.objectToString(consumingInstancePartitions)));
+    assertEquals(instancePartitionsMap.size(), 1);
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
+        Collections.singletonList(realtimeInstanceId));
+
+    // OFFLINE instance partitions should have OFFLINE instance, CONSUMING and 
COMPLETED instance partitions should have
+    // REALTIME instance
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 3);
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0,
 0),
+        Collections.singletonList(offlineInstanceId));
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
+        Collections.singletonList(realtimeInstanceId));
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
 0),
+        Collections.singletonList(realtimeInstanceId));
+
+    // Delete the OFFLINE table
+    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+    instancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(instancePartitionsMap.size(), 2);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
+    // Delete the REALTIME table
+    _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+    try {
+      getInstancePartitionsMap();
+      fail();
+    } catch (FileNotFoundException e) {
+      // Expected
+    }
+  }
+
+  private Map<InstancePartitionsType, InstancePartitions> 
getInstancePartitionsMap()
+      throws Exception {
+    return deserializeInstancePartitionsMap(
+        
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null)));
+  }
+
+  private Map<InstancePartitionsType, InstancePartitions> 
deserializeInstancePartitionsMap(
+      String instancePartitionsMapString)
+      throws Exception {
+    return JsonUtils.stringToObject(instancePartitionsMapString,
+        new TypeReference<Map<InstancePartitionsType, InstancePartitions>>() {
+        });
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 8639b00..fec7a73 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -29,8 +29,8 @@ import 
org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TagNameUtils;
-import org.apache.pinot.common.config.TagOverrideConfig;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
+import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
 import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -40,6 +40,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.fail;
 
 
@@ -47,34 +48,9 @@ public class InstanceAssignmentTest {
   private static final String RAW_TABLE_NAME = "myTable";
   private static final String TENANT_NAME = "tenant";
   private static final String OFFLINE_TAG = 
TagNameUtils.getOfflineTagForTenant(TENANT_NAME);
-  private static final String REALTIME_TAG = 
TagNameUtils.getRealtimeTagForTenant(TENANT_NAME);
   private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
 
   @Test
-  public void testDefaultOfflineNonReplicaGroup() {
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME).build();
-    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
-    int numInstances = 10;
-    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
-    for (int i = 0; i < numInstances; i++) {
-      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
-      instanceConfig.addTag(OFFLINE_TAG);
-      instanceConfigs.add(instanceConfig);
-    }
-
-    // All instances should be assigned as replica 0 partition 0
-    InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    List<String> expectedInstances = new ArrayList<>(numInstances);
-    for (int i = 0; i < numInstances; i++) {
-      expectedInstances.add(SERVER_INSTANCE_ID_PREFIX + i);
-    }
-    assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
-  }
-
-  @Test
   public void testDefaultOfflineReplicaGroup() {
     int numReplicas = 3;
     TableConfig tableConfig =
@@ -149,53 +125,6 @@ public class InstanceAssignmentTest {
   }
 
   @Test
-  public void testRealtimeTagOverride() {
-    // Override COMPLETED tag to use OFFLINE tag
-    TagOverrideConfig tagOverrideConfig = new TagOverrideConfig();
-    tagOverrideConfig.setRealtimeConsuming(REALTIME_TAG);
-    tagOverrideConfig.setRealtimeCompleted(OFFLINE_TAG);
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTagOverrideConfig(tagOverrideConfig)
-            .build();
-    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
-    int numInstances = 10;
-    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
-    for (int i = 0; i < numInstances; i++) {
-      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
-      if (i % 2 == 0) {
-        instanceConfig.addTag(REALTIME_TAG);
-      } else {
-        instanceConfig.addTag(OFFLINE_TAG);
-      }
-      instanceConfigs.add(instanceConfig);
-    }
-
-    // All instances with REALTIME tag should be assigned as replica 0 
partition 0 for CONSUMING instance partitions
-    InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    List<String> expectedInstances = new ArrayList<>(numInstances);
-    for (int i = 0; i < numInstances; i++) {
-      if (i % 2 == 0) {
-        expectedInstances.add(SERVER_INSTANCE_ID_PREFIX + i);
-      }
-    }
-    assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
-
-    // All instances with OFFLINE tag should be assigned as replica 0 
partition 0 for COMPLETED instance partitions
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), 1);
-    assertEquals(instancePartitions.getNumPartitions(), 1);
-    expectedInstances = new ArrayList<>(numInstances);
-    for (int i = 0; i < numInstances; i++) {
-      if (i % 2 != 0) {
-        expectedInstances.add(SERVER_INSTANCE_ID_PREFIX + i);
-      }
-    }
-    assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
-  }
-
-  @Test
   public void testPoolBased() {
     // 10 instances in 2 pools, each with 5 instances
     int numInstances = 10;
@@ -329,10 +258,7 @@ public class InstanceAssignmentTest {
 
   @Test
   public void testIllegalConfig() {
-    InstanceAssignmentConfig assignmentConfig = new InstanceAssignmentConfig();
-    TableConfig tableConfig = new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 assignmentConfig))
-        .build();
+    TableConfig tableConfig = new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
 
     int numInstances = 10;
@@ -342,6 +268,19 @@ public class InstanceAssignmentTest {
       instanceConfigs.add(instanceConfig);
     }
 
+    // No instance assignment config
+    
assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
InstancePartitionsType.OFFLINE));
+    try {
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Instance assignment is not allowed for the 
given table config");
+    }
+
+    InstanceAssignmentConfig assignmentConfig = new InstanceAssignmentConfig();
+    tableConfig
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 assignmentConfig));
+
     // No instance tag/pool config
     try {
       driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
index 1a5d017..41bdc8d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
@@ -217,9 +217,11 @@ public class ReplicaGroupRebalanceStrategyTest extends 
ControllerTest {
     }
 
     // Test removing a replica group
+    // NOTE: ReplicaGroupRebalanceSegmentStrategy is able to remove a 
replica-group only when all the instances in the
+    //       replica are removed
     _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_0", 
OFFLINE_TENENT_NAME);
-    _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_1", 
OFFLINE_TENENT_NAME);
     _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_2", 
OFFLINE_TENENT_NAME);
+    _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_4", 
OFFLINE_TENENT_NAME);
 
     targetNumInstancePerPartition = 3;
     targetNumReplicaGroup = 2;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 92ea2af..9f35b9b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -206,78 +206,6 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     Assert.assertTrue(partitionAssignment == null);
   }
 
-  /**
-   * Tests to check creation/deletion of replica group znode when table is 
created/updated/deleted
-   */
-  @Test
-  public void testRealtimeReplicaGroupPartitionAssignment() {
-    String rawTableName = TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT;
-    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
-
-    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(rawTableName).build();
-    _helixResourceManager.addOrUpdateSchema(schema);
-
-    Map<String, String> streamConfigMap = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
-
-    // Adding a table without replica group
-    TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME)
-        .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
-        
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS)
-        .setStreamConfigs(streamConfigMap).setLLC(true).build();
-    try {
-      _helixResourceManager.addTable(tableConfig);
-    } catch (Exception e) {
-      // ignore
-    }
-
-    // Check that partition assignment does not exist
-    ReplicaGroupPartitionAssignment partitionAssignment =
-        
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
-
-    Assert.assertNull(partitionAssignment);
-
-    // Update table config with replica group config
-    int numInstancesPerPartition = 5;
-    ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new 
ReplicaGroupStrategyConfig();
-    
replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerPartition);
-    replicaGroupStrategyConfig.setMirrorAssignmentAcrossReplicaGroups(true);
-
-    TableConfig replicaGroupTableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME)
-        
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICAS)
-        
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").setStreamConfigs(streamConfigMap)
-        .setLLC(true).build();
-
-    
replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
-
-    // Check that the replica group partition assignment is created
-    try {
-      _helixResourceManager.setExistingTableConfig(replicaGroupTableConfig);
-    } catch (Exception e) {
-      // ignore
-    }
-    partitionAssignment = 
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
-    Assert.assertNotNull(partitionAssignment);
-
-    // After table deletion, check that the replica group partition assignment 
is deleted
-    _helixResourceManager.deleteRealtimeTable(rawTableName);
-    partitionAssignment = 
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
-    Assert.assertNull(partitionAssignment);
-
-    // Create a table with replica group
-    try {
-      _helixResourceManager.addTable(replicaGroupTableConfig);
-    } catch (Exception e) {
-      // ignore
-    }
-    partitionAssignment = 
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
-    Assert.assertNotNull(partitionAssignment);
-
-    // Check that the replica group partition assignment is deleted
-    _helixResourceManager.deleteRealtimeTable(rawTableName);
-    partitionAssignment = 
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
-    Assert.assertNull(partitionAssignment);
-  }
-
   @Test
   public void testTableLevelAndMirroringReplicaGroupSegmentAssignmentStrategy()
       throws Exception {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java
index 2c37800..5b0bd28 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java
@@ -54,39 +54,4 @@ public class ReplicationUtils {
     }
     return false;
   }
-
-  /**
-   * Returns the {@link 
SegmentsValidationAndRetentionConfig::getReplicasPerPartitionNumber} if it is 
eligible for use,
-   * else returns the {@link 
SegmentsValidationAndRetentionConfig::getReplicationNumber}
-   *
-   * The reason we have replication as well as replicasPerPartition is, 
"replication" is used in HLC to support 'split' kafka topics
-   * For example, if replication is 3, and we have 6 realtime servers, each 
server will half of the events in the topic
-   * (see @link 
PinotTableIdealStateBuilder::setupInstanceConfigForHighLevelConsumer}
-   * ReplicasPerPartition is used in LLC as the replication.
-   * We need to keep both, as we could be operating in dual mode during 
migrations from HLC to LLC
-   */
-  public static int getReplication(TableConfig tableConfig) {
-
-    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
-    return useReplicasPerPartition(tableConfig) ? 
validationConfig.getReplicasPerPartitionNumber() : validationConfig
-        .getReplicationNumber();
-  }
-
-  /**
-   * Check if replica groups setup is necessary for realtime
-   *
-   * Replica groups is supported when only LLC is present.
-   * We do not want znode being created or failures in some validations, when 
attempting to setup replica groups in HLC.
-   * As a result, Replica groups cannot be used during migrations from HLC to 
LLC.
-   * In such a scenario, migrate to LLC completely, and then enable replica 
groups
-   *
-   */
-  public static boolean setupRealtimeReplicaGroups(TableConfig tableConfig) {
-    TableType tableType = tableConfig.getTableType();
-    if (tableType.equals(TableType.REALTIME)) {
-      StreamConfig streamConfig = new 
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
-      return streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType();
-    }
-    return false;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to