This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e37f2c837 pre-configuration based assignment (#11578)
2e37f2c837 is described below

commit 2e37f2c837dd736ef94b0af4bc3f0d800f279f97
Author: Jia Guo <[email protected]>
AuthorDate: Mon Oct 30 09:06:39 2023 -0700

    pre-configuration based assignment (#11578)
    
    * add tenant partition map rest resource
    
    add tenant partition map rest resource
    
    fix logic
    
    refactor and fix some logic
    
    pre-configuration based assignment
    
    * checkstyle
    
    * checkstyle
    
    * checkstyle
    
    * Trigger Test
    
    * Address comments
    
    * refactored branches
    
    * address comments
    
    * address comments
    
    * address comments
    
    * Change the shuffling logic to be compatible with future changes
    
    * add test cases
    
    * address comments
    
    * address comments
    
    * address comments
    
    * change table validation logic
    
    * Trigger Test
    
    * update shuffling logic
    
    * add more logging
---
 .../assignment/InstanceAssignmentConfigUtils.java  |  10 +
 .../PinotInstanceAssignmentRestletResource.java    |  44 +-
 .../api/resources/PinotTenantRestletResource.java  |  82 +-
 .../helix/core/PinotHelixResourceManager.java      |  24 +-
 .../instance/InstanceAssignmentDriver.java         |  21 +-
 .../instance/InstancePartitionSelectorFactory.java |  12 +-
 .../MirrorServerSetInstancePartitionSelector.java  | 366 ++++++++
 .../helix/core/rebalance/TableRebalancer.java      |  63 +-
 .../instance/InstanceAssignmentTest.java           | 964 ++++++++++++++++++++-
 .../java/org/apache/pinot/core/auth/Actions.java   |   2 +
 .../segment/local/utils/TableConfigUtils.java      |  24 +-
 .../table/assignment/InstanceAssignmentConfig.java |   3 +-
 12 files changed, 1559 insertions(+), 56 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index b37429c527..30a3a19f20 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -124,4 +124,14 @@ public class InstanceAssignmentConfigUtils {
 
     return new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig);
   }
+
+  public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
+      InstancePartitionsType instancePartitionsType) {
+    // If the instance assignment config is not null and the partition 
selector is
+    // MIRROR_SERVER_SET_PARTITION_SELECTOR,
+    return 
tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString())
 != null
+        && 
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, 
instancePartitionsType)
+        .getPartitionSelector()
+        == 
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 3eeb6665a4..282431e04b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -69,7 +69,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+import static org.apache.pinot.spi.utils.CommonConstants.*;
 
 
 @Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
@@ -244,20 +244,38 @@ public class PinotInstanceAssignmentRestletResource {
   private void assignInstancesForInstancePartitionsType(Map<String, 
InstancePartitions> instancePartitionsMap,
       TableConfig tableConfig, List<InstanceConfig> instanceConfigs, 
InstancePartitionsType instancePartitionsType) {
     String tableNameWithType = tableConfig.getTableName();
-    if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType)) {
-      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType)) {
+      InstancePartitions existingInstancePartitions =
+          
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+              
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
       instancePartitionsMap.put(instancePartitionsType.toString(),
-          
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
-              
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
-              instancePartitionsType.getInstancePartitionsName(rawTableName)));
-      return;
-    }
-    InstancePartitions existingInstancePartitions =
-        
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+          new 
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, 
instanceConfigs,
+              existingInstancePartitions));
+    } else {
+      if 
(InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, 
instancePartitionsType)) {
+        // fetch the existing instance partitions, if the table, this is 
referenced in the new instance partitions
+        // generation for minimum difference
+        InstancePartitions existingInstancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(
+            _resourceManager.getHelixZkManager().getHelixPropertyStore(),
             
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
-    instancePartitionsMap.put(instancePartitionsType.toString(),
-        new 
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, 
instanceConfigs,
-            existingInstancePartitions));
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+        // fetch the pre-configured instance partitions, the renaming part is 
irrelevant as we are not really
+        // preserving this preConfigured, but only using it as a reference to 
generate the new instance partitions
+        InstancePartitions preConfigured =
+            
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+                
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+                
instancePartitionsType.getInstancePartitionsName(rawTableName));
+        instancePartitionsMap.put(instancePartitionsType.toString(),
+            new 
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, 
instanceConfigs,
+                existingInstancePartitions, preConfigured));
+      } else {
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+        instancePartitionsMap.put(instancePartitionsType.toString(),
+            
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+                
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+                
instancePartitionsType.getInstancePartitionsName(rawTableName)));
+      }
+    }
   }
 
   private void assignInstancesForTier(Map<String, InstancePartitions> 
instancePartitionsMap, TableConfig tableConfig,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 3aa5da48e9..8166427a93 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +51,8 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -68,13 +71,14 @@ import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+import static org.apache.pinot.spi.utils.CommonConstants.*;
 
 
 /**
@@ -286,6 +290,82 @@ public class PinotTenantRestletResource {
     }
   }
 
+  @GET
+  @Path("/tenants/{tenantName}/instancePartitions")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_INSTANCE_PARTITIONS)
+  @Authenticate(AccessType.READ)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the instance partitions of a tenant")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success", 
response = InstancePartitions.class),
+      @ApiResponse(code = 404, message = "Instance partitions not found")})
+  public InstancePartitions getInstancePartitions(
+      @ApiParam(value = "Tenant name ", required = true) 
@PathParam("tenantName") String tenantName,
+      @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", 
required = true,
+          allowableValues = "OFFLINE, CONSUMING, COMPLETED")
+      @QueryParam("instancePartitionType") String instancePartitionType) {
+    String tenantNameWithType = 
InstancePartitionsType.valueOf(instancePartitionType)
+        .getInstancePartitionsName(tenantName);
+    InstancePartitions instancePartitions =
+        
InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
+            tenantNameWithType);
+
+    if (instancePartitions == null) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to find the instance partitions for %s", 
tenantNameWithType),
+          Response.Status.NOT_FOUND);
+    } else {
+      return instancePartitions;
+    }
+  }
+
+  @PUT
+  @Path("/tenants/{tenantName}/instancePartitions")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_INSTANCE_PARTITIONS)
+  @Authenticate(AccessType.UPDATE)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Update an instance partition for a server type in a 
tenant")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success", 
response = InstancePartitions.class),
+      @ApiResponse(code = 400, message = "Failed to deserialize/validate the 
instance partitions"),
+      @ApiResponse(code = 500, message = "Error updating the tenant")})
+  public InstancePartitions assignInstancesPartitionMap(
+      @ApiParam(value = "Tenant name ", required = true) 
@PathParam("tenantName") String tenantName,
+      @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", 
required = true,
+          allowableValues = "OFFLINE, CONSUMING, COMPLETED")
+      @QueryParam("instancePartitionType") String instancePartitionType,
+      String instancePartitionsStr) {
+    InstancePartitions instancePartitions;
+    try {
+      instancePartitions = JsonUtils.stringToObject(instancePartitionsStr, 
InstancePartitions.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to deserialize 
the instance partitions",
+          Response.Status.BAD_REQUEST);
+    }
+
+    String inputTenantName = 
InstancePartitionsType.valueOf(instancePartitionType)
+        .getInstancePartitionsName(tenantName);
+
+    if 
(!instancePartitions.getInstancePartitionsName().equals(inputTenantName)) {
+      throw new ControllerApplicationException(LOGGER, "Instance partitions 
name mismatch, expected: "
+          + inputTenantName
+          + ", got: " + instancePartitions.getInstancePartitionsName(), 
Response.Status.BAD_REQUEST);
+    }
+
+    persistInstancePartitionsHelper(instancePartitions);
+    return instancePartitions;
+  }
+
+  private void persistInstancePartitionsHelper(InstancePartitions 
instancePartitions) {
+    try {
+      LOGGER.info("Persisting instance partitions: {}", instancePartitions);
+      
InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
+          instancePartitions);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Caught Exception while 
persisting the instance partitions",
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   private String getTablesServedFromServerTenant(String tenantName) {
     Set<String> tables = new HashSet<>();
     ObjectNode resourceGetRet = JsonUtils.newObjectNode();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9df5835216..8e1ff52e13 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1753,20 +1753,30 @@ public class PinotHelixResourceManager {
       for (InstancePartitionsType instancePartitionsType : 
instancePartitionsTypesToAssign) {
         boolean hasPreConfiguredInstancePartitions =
             TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType);
+        boolean isPreConfigurationBasedAssignment =
+            
InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, 
instancePartitionsType);
         InstancePartitions instancePartitions;
         if (!hasPreConfiguredInstancePartitions) {
           instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs, null);
           LOGGER.info("Persisting instance partitions: {}", 
instancePartitions);
-          InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
         } else {
           String referenceInstancePartitionsName = 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
-          instancePartitions =
-              
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, 
referenceInstancePartitionsName,
-                  
instancePartitionsType.getInstancePartitionsName(rawTableName));
-          LOGGER.info("Persisting instance partitions: {} (referencing {})", 
instancePartitions,
-              referenceInstancePartitionsName);
-          InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
+          if (isPreConfigurationBasedAssignment) {
+            InstancePartitions preConfiguredInstancePartitions =
+                
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
+                    referenceInstancePartitionsName, 
instancePartitionsType.getInstancePartitionsName(rawTableName));
+            instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs, null,
+                preConfiguredInstancePartitions);
+            LOGGER.info("Persisting instance partitions: {} (based on {})", 
instancePartitions,
+                preConfiguredInstancePartitions);
+          } else {
+            instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
+                referenceInstancePartitionsName, 
instancePartitionsType.getInstancePartitionsName(rawTableName));
+            LOGGER.info("Persisting instance partitions: {} (referencing {})", 
instancePartitions,
+                referenceInstancePartitionsName);
+          }
         }
+        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
       }
     }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 7a5c901029..6d869b86c1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -60,19 +60,31 @@ public class InstanceAssignmentDriver {
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions, @Nullable
+      InstancePartitions preConfiguredInstancePartitions) {
+    String tableNameWithType = _tableConfig.getTableName();
+    InstanceAssignmentConfig assignmentConfig =
+        
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
+    return getInstancePartitions(
+        
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
-      @Nullable InstancePartitions existingInstancePartitions) {
+      @Nullable InstancePartitions existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
@@ -93,7 +105,8 @@ public class InstanceAssignmentDriver {
 
     InstancePartitionSelector instancePartitionSelector =
         
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
-            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions);
+            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions,
+            preConfiguredInstancePartitions);
     InstancePartitions instancePartitions = new 
InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
     return instancePartitions;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index f786ffe0b4..256aa89b02 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -31,7 +31,14 @@ public class InstancePartitionSelectorFactory {
 
   public static InstancePartitionSelector 
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
       InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, 
String tableNameWithType,
-      InstancePartitions existingInstancePartitions
+      InstancePartitions existingInstancePartitions) {
+    return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, 
tableNameWithType,
+        existingInstancePartitions, null);
+  }
+
+  public static InstancePartitionSelector 
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
+      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, 
String tableNameWithType,
+      InstancePartitions existingInstancePartitions, InstancePartitions 
preConfiguredInstancePartitions
   ) {
     switch (partitionSelector) {
       case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
@@ -40,6 +47,9 @@ public class InstancePartitionSelectorFactory {
       case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
         return new 
InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType,
             existingInstancePartitions);
+      case MIRROR_SERVER_SET_PARTITION_SELECTOR:
+        return new 
MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType,
+            existingInstancePartitions, preConfiguredInstancePartitions);
       default:
         throw new IllegalStateException("Unexpected PartitionSelector: " + 
partitionSelector + ", should be from"
             + 
Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
new file mode 100644
index 0000000000..6b4086615a
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see 
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the 
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror 
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a 
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ *  1. The number of replica groups in the tenant level instance partitions is 
the same as the number of replica groups
+ *     in the table config.
+ *  2. The number of partitions at replica group level is 1
+ *  3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends 
InstancePartitionSelector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+  private final InstancePartitions _preConfiguredInstancePartitions;
+
+  // dimensions of target instance partition
+  private final int _numTargetInstancesPerReplicaGroup;
+  private final int _numTargetReplicaGroups;
+  private final int _numTargetTotalInstances;
+  // look up tables for pre-configured instance partition
+  private final List<List<String>> _preConfiguredMirroredServerLists = new 
ArrayList<>();
+  private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap = 
new HashMap<>();
+  private final List<List<String>> _existingMirroredServerLists = new 
ArrayList<>();
+  // dimensions of pre-configured instance partition
+  private int _numPreConfiguredReplicaGroups;
+  private int _numPreConfiguredInstancesPerReplicaGroup;
+  // dimensions of existing instance partition
+  private int _numExistingReplicaGroups;
+  private int _numExistingInstancesPerReplicaGroup;
+
+  public 
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
+      String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions,
+      InstancePartitions preConfiguredInstancePartitions) {
+    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions);
+    _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+    _numTargetInstancesPerReplicaGroup = 
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+    _numTargetReplicaGroups = 
_replicaGroupPartitionConfig.getNumReplicaGroups();
+    _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup * 
_numTargetReplicaGroups;
+  }
+
+  /**
+   * validate if the poolToInstanceConfigsMap is a valid input for 
pre-configuration based replica-group selection
+   */
+  private void validatePoolDiversePreconditions(Map<Integer, 
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+    LOGGER.info("Validating pre-configured instance partitions for 
pre-configuration based replica-group selection");
+
+    // numTargetInstancesPerReplica should be positive
+    LOGGER.info("Number of instances per replica: {}", 
_numTargetInstancesPerReplicaGroup);
+    Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+        "Number of instances per replica must be positive");
+
+    // _numTargetReplicaGroups should be positive
+    LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+    Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of 
replica-groups must be positive");
+
+    // validate target partition count is 1
+    LOGGER.info("Number of partitions: {}", 
_replicaGroupPartitionConfig.getNumPartitions());
+    Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions() 
<= 1,
+        "This algorithm does not support table level partitioning for target 
assignment");
+
+    // Validate the existing instance partitions is null or has only one 
partition
+    LOGGER.info("Number of partitions in existing instance partitions: {}",
+        _existingInstancePartitions == null ? 0 : 
_existingInstancePartitions.getNumPartitions());
+    Preconditions.checkState(
+        (_existingInstancePartitions == null || 
_existingInstancePartitions.getNumPartitions() == 1),
+        "This algorithm does not support replica group level partitioning for 
existing assignment");
+
+    _numExistingReplicaGroups =
+        _existingInstancePartitions == null ? 0 : 
_existingInstancePartitions.getNumReplicaGroups();
+    _numExistingInstancesPerReplicaGroup =
+        _existingInstancePartitions == null ? 0 : 
_existingInstancePartitions.getInstances(0, 0).size();
+
+    // Validate the pre-configured instance partitions is not null and has 
only one partition
+    Preconditions.checkState(_preConfiguredInstancePartitions != null,
+        "Pre-configured instance partitions must be provided for 
pre-configuration based selection");
+    LOGGER.info("Number of partitions in pre-configured instance partitions: 
{}",
+        _preConfiguredInstancePartitions.getNumPartitions());
+    
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() == 
1,
+        "This algorithm does not support table level partitioning for 
pre-configured assignment");
+
+    // Validate the number of replica-groups in the pre-configured instance 
partitions is equal to the target
+    // number of replica-groups
+    _numPreConfiguredReplicaGroups = 
_preConfiguredInstancePartitions.getNumReplicaGroups();
+    LOGGER.info("Number of replica-groups in pre-configured instance 
partitions: {}", _numPreConfiguredReplicaGroups);
+    Preconditions.checkState(_numPreConfiguredReplicaGroups == 
_numTargetReplicaGroups,
+        "The number of replica-groups %s in the pre-configured instance 
partitions "
+            + "is not equal to the target number of replica-groups %s", 
_numPreConfiguredReplicaGroups,
+        _numTargetReplicaGroups);
+
+    // Validate the number of instances per replica-group in the 
pre-configured instance partitions is greater than or
+    // equal to the target number of instances per replica-group
+    _numPreConfiguredInstancesPerReplicaGroup = 
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+    LOGGER.info("Number of instances per replica-group in pre-configured 
instance partitions: {}, target number of "
+            + "instances per replica-group: {}", 
_numPreConfiguredInstancesPerReplicaGroup,
+        _numTargetInstancesPerReplicaGroup);
+    Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >= 
_numTargetInstancesPerReplicaGroup,
+        "The number of instances per replica-group in the pre-configured "
+            + "instance partitions is less than the target number of instances 
per replica-group %s",
+        _numTargetInstancesPerReplicaGroup);
+
+    // Validate the pool to instance configs map is not null or empty
+    Preconditions.checkNotNull(poolToInstanceConfigsMap, 
"poolToInstanceConfigsMap is null");
+    int numPools = poolToInstanceConfigsMap.size();
+    Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+    Integer totalInstanceCount = 
poolToInstanceConfigsMap.values().stream().map(List::size)
+        .reduce(Integer::sum).orElse(0);
+    LOGGER.info("Total number of instances in all pools: {}, target number of 
instances: {}", totalInstanceCount,
+        _numTargetTotalInstances);
+    Preconditions.checkState(totalInstanceCount
+            >= _numTargetTotalInstances,
+        "The total number of instances in all pools is less than the target 
number of target instances");
+
+    HashSet<String> availableInstanceSet = new HashSet<>();
+    poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i -> 
availableInstanceSet.add(i.getInstanceName())));
+    LOGGER.info("Number of pools: {}", numPools);
+    LOGGER.info("Number of instances in all pools: {}", 
availableInstanceSet.size());
+    LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+    for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+      List<String> instances = 
_preConfiguredInstancePartitions.getInstances(0, i);
+      for (String instance : instances) {
+        Preconditions.checkState(availableInstanceSet.contains(instance),
+            "Instance %s in pre-configured instance partitions is not in "
+                + "the pool to instance configs map",
+            instance);
+      }
+    }
+
+    LOGGER.info("Validation passed. The instances provided can satisfy the 
pool diverse requirement.");
+    LOGGER.info("Trying to assign total {} instances to {} replica groups, " + 
"with {} instance per replica group",
+        _numTargetTotalInstances, _numTargetReplicaGroups, 
_numTargetInstancesPerReplicaGroup);
+  }
+
+  private void createMirrorServerListFromPreconfiguredInstancePartition() {
+    List<List<String>> preConfiguredReplicaGroups = new 
ArrayList<>(_numPreConfiguredReplicaGroups);
+    for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+      
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, 
i));
+    }
+
+    for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+      List<String> mirroredServerList = new ArrayList<>();
+      for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+        mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+      }
+      _preConfiguredMirroredServerLists.add(mirroredServerList);
+    }
+  }
+
+  private void 
createMirrorServerListLookupTablesFromPreconfiguredInstancePartition() {
+    List<List<String>> preConfiguredReplicaGroups = new 
ArrayList<>(_numPreConfiguredReplicaGroups);
+    for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+      
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, 
i));
+    }
+
+    for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+      for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+        String instance = preConfiguredReplicaGroups.get(i).get(j);
+        _preConfiguredInstanceNameToOffsetMap.put(instance, j);
+      }
+    }
+  }
+
+  @Override
+  public void selectInstances(Map<Integer, List<InstanceConfig>> 
poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions) {
+    // throw exception instantly if not replica-group based
+    if (!_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+      throw new IllegalStateException("Does not support Non-replica-group 
based selection");
+    }
+
+    validatePoolDiversePreconditions(poolToInstanceConfigsMap);
+    if (_existingInstancePartitions == null) {
+      // If no existing instance partitions, create new instance partitions 
based on the pre-configured instance
+      // partitions. This is done by just selecting 
_targetNumInstancesPerReplicaGroup set of mirrored servers
+      // from the pre-configured instance partitions.
+      initialAssignment(instancePartitions);
+    } else {
+      // If existing instance partitions exist, adjust the existing instance 
partitions based on the pre-configured
+      // instance partitions. This code path takes care of instance 
replacement, uplift, and downlift.
+      // This is done by search in the pre-configured instance partitions for 
the mirrored
+      // servers sets that are similar to the existing sets in instance 
partitions.
+      scale(instancePartitions);
+    }
+  }
+
+  private void initialAssignment(InstancePartitions instancePartitions) {
+    LOGGER.info("No existing instance partitions found. Will build new on top 
of"
+        + " the pre-configured instance partitions");
+    // create a list of lists of mirrored servers from the pre-configured 
instance partitions
+    createMirrorServerListFromPreconfiguredInstancePartition();
+    // shuffle the list of lists of mirrored servers based on the table name 
hash
+    int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+    // initialize a list of indices from 0 to 
_numPreConfiguredInstancesPerReplicaGroup
+    List<Integer> shuffledIndex = new 
ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup);
+    for (int i = 0; i < _numPreConfiguredInstancesPerReplicaGroup; i++) {
+      shuffledIndex.add(i);
+    }
+    // shuffle the list of indices based on the table name hash
+    Collections.shuffle(shuffledIndex, new Random(tableNameHash));
+    // select the first _numTargetInstancesPerReplicaGroup indices
+    shuffledIndex = shuffledIndex.subList(0, 
_numTargetInstancesPerReplicaGroup);
+    // sort the list of indices so that they follow the original order of the 
pre-configured instance partitions
+    shuffledIndex.sort(Comparator.naturalOrder());
+
+    // create the instance partitions based on the shuffled list of mirrored 
servers
+    List<List<String>> resultReplicaGroups = new 
ArrayList<>(_numTargetReplicaGroups);
+    for (int i = 0; i < _numTargetReplicaGroups; i++) {
+      resultReplicaGroups.add(new 
ArrayList<>(_numTargetInstancesPerReplicaGroup));
+    }
+
+    // populate the instance partitions with the selected mirrored servers
+    for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+      for (int i = 0; i < _numTargetReplicaGroups; i++) {
+        
resultReplicaGroups.get(i).add(_preConfiguredMirroredServerLists.get(shuffledIndex.get(j)).get(i));
+      }
+    }
+    for (int i = 0; i < _numTargetReplicaGroups; i++) {
+      instancePartitions.setInstances(0, i, resultReplicaGroups.get(i));
+    }
+  }
+
+  private void scale(InstancePartitions instancePartitions) {
+    LOGGER.info("Existing instance partitions found. Will adjust the existing 
instance partitions"
+        + " based on the pre-configured instance partitions");
+    createMirrorServerListFromPreconfiguredInstancePartition();
+    createMirrorServerListLookupTablesFromPreconfiguredInstancePartition();
+    createListAndLookupTablesFromExistingInstancePartitions();
+    Set<Integer> usedPreconfiguredInstanceOffsets = new HashSet<>();
+    Map<Integer, Map.Entry<Integer, Long>> existingOffsetToResultTuple = new 
HashMap<>();
+
+    // For each instance offset, find the mirrored server that is most similar 
to the existing mirrored server
+    // set. If this mirrored server is not used, add it to the result list.
+    for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) {
+      List<String> existingMirroredServers = 
_existingMirroredServerLists.get(j);
+      int finalJ = j;
+      existingMirroredServers.stream()
+          .map(_preConfiguredInstanceNameToOffsetMap::get)
+          .filter(Objects::nonNull)
+          .filter(offset -> !usedPreconfiguredInstanceOffsets.contains(offset))
+          .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()))
+          .entrySet()
+          .stream()
+          .max(Map.Entry.comparingByValue())
+          .ifPresent(e -> {
+            existingOffsetToResultTuple.put(finalJ, e);
+            usedPreconfiguredInstanceOffsets.add(e.getKey());
+          });
+    }
+
+    if (_numExistingInstancesPerReplicaGroup > 
_numTargetInstancesPerReplicaGroup) {
+      // If this is a downlift case
+      List<Map.Entry<Integer, Long>> collect = 
existingOffsetToResultTuple.values()
+          .stream()
+          .sorted((a, b) -> b.getValue().compareTo(a.getValue()))
+          .limit(_numTargetInstancesPerReplicaGroup)
+          .collect(Collectors.toList());
+      int size = collect.size();
+      existingOffsetToResultTuple.clear();
+      usedPreconfiguredInstanceOffsets.clear();
+      for (int j = 0; j < size; j++) {
+        existingOffsetToResultTuple.put(j, collect.get(j));
+        usedPreconfiguredInstanceOffsets.add(collect.get(j).getKey());
+      }
+    }
+
+    if (existingOffsetToResultTuple.size() < 
_numTargetInstancesPerReplicaGroup) {
+      // If the number of instances selected from the result list is less than 
the target number
+      // of instances per replica group, add the remaining instances from the 
pre-configured instance partitions.
+      List<Integer> shuffledOffsets = new 
ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup);
+      for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+        shuffledOffsets.add(j);
+      }
+      for (Map.Entry<Integer, Map.Entry<Integer, Long>> entry : 
existingOffsetToResultTuple.entrySet()) {
+        shuffledOffsets.remove(entry.getValue().getKey());
+      }
+      Collections.shuffle(shuffledOffsets, new 
Random(Math.abs(_tableNameWithType.hashCode())));
+      shuffledOffsets =
+          shuffledOffsets.subList(0, _numTargetInstancesPerReplicaGroup - 
existingOffsetToResultTuple.size());
+      shuffledOffsets.sort(Comparator.naturalOrder());
+      for (int k = 0, j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+        if (existingOffsetToResultTuple.containsKey(j)) {
+          continue;
+        }
+        Integer offset = shuffledOffsets.get(k++);
+        existingOffsetToResultTuple.put(j, new 
AbstractMap.SimpleEntry<>(offset, 0L));
+        usedPreconfiguredInstanceOffsets.add(offset);
+      }
+    }
+
+    List<List<String>> resultReplicaGroups = new 
ArrayList<>(_numTargetReplicaGroups);
+    for (int i = 0; i < _numTargetReplicaGroups; i++) {
+      resultReplicaGroups.add(new 
ArrayList<>(_numTargetInstancesPerReplicaGroup));
+    }
+    for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+      List<String> mirrorServers = 
_preConfiguredMirroredServerLists.get(existingOffsetToResultTuple.get(j).getKey());
+      for (int i = 0; i < _numTargetReplicaGroups; i++) {
+        resultReplicaGroups.get(i).add(mirrorServers.get(i));
+      }
+    }
+    for (int i = 0; i < _numTargetReplicaGroups; i++) {
+      instancePartitions.setInstances(0, i, resultReplicaGroups.get(i));
+    }
+  }
+
+  private void createListAndLookupTablesFromExistingInstancePartitions() {
+    List<List<String>> existingReplicaGroups = new 
ArrayList<>(_numExistingReplicaGroups);
+    for (int i = 0; i < _numExistingReplicaGroups; i++) {
+      existingReplicaGroups.add(_existingInstancePartitions.getInstances(0, 
i));
+    }
+
+    for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) {
+      List<String> existingMirroredServerList = new ArrayList<>();
+      for (int i = 0; i < _numExistingReplicaGroups; i++) {
+        existingMirroredServerList.add(existingReplicaGroups.get(i).get(j));
+      }
+      _existingMirroredServerLists.add(existingMirroredServerList);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a27c4ffbfb..a052794ae2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -557,31 +557,52 @@ public class TableRebalancer {
       if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
instancePartitionsType)) {
         boolean hasPreConfiguredInstancePartitions =
             TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType);
-        if (hasPreConfiguredInstancePartitions) {
-          String referenceInstancePartitionsName = 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
-          InstancePartitions instancePartitions =
-              
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
-                  referenceInstancePartitionsName, instancePartitionsName);
-          boolean instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
+        boolean isPreConfigurationBasedAssignment =
+            
InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, 
instancePartitionsType);
+        InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
+        InstancePartitions instancePartitions;
+        boolean instancePartitionsUnchanged;
+        if (!hasPreConfiguredInstancePartitions) {
+          LOGGER.info("Reassigning {} instances for table: {}", 
instancePartitionsType, tableNameWithType);
+          // Assign instances with existing instance partition to null if 
bootstrap mode is enabled, so that the
+          // instance partition map can be fully recalculated.
+          instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
+              
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
+              bootstrap ? null : existingInstancePartitions);
+          instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
           if (!dryRun && !instancePartitionsUnchanged) {
-            LOGGER.info("Persisting instance partitions: {} (referencing {})", 
instancePartitions,
-                referenceInstancePartitionsName);
+            LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
             
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
                 instancePartitions);
           }
-          return Pair.of(instancePartitions, instancePartitionsUnchanged);
-        }
-        LOGGER.info("Reassigning {} instances for table: {}", 
instancePartitionsType, tableNameWithType);
-        InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
-        // Assign instances with existing instance partition to null if 
bootstrap mode is enabled, so that the instance
-        // partition map can be fully recalculated.
-        InstancePartitions instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
-            
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
-            bootstrap ? null : existingInstancePartitions);
-        boolean instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
-        if (!dryRun && !instancePartitionsUnchanged) {
-          LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
-          
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
 instancePartitions);
+        } else {
+          String referenceInstancePartitionsName = 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
+          if (isPreConfigurationBasedAssignment) {
+            InstancePartitions preConfiguredInstancePartitions =
+                
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+                    referenceInstancePartitionsName, instancePartitionsName);
+            instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
+                
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
+                bootstrap ? null : existingInstancePartitions, 
preConfiguredInstancePartitions);
+            instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
+            if (!dryRun && !instancePartitionsUnchanged) {
+              LOGGER.info("Persisting instance partitions: {} (based on {})", 
instancePartitions,
+                  preConfiguredInstancePartitions);
+              
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+                  instancePartitions);
+            }
+          } else {
+            instancePartitions =
+                
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+                    referenceInstancePartitionsName, instancePartitionsName);
+            instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
+            if (!dryRun && !instancePartitionsUnchanged) {
+              LOGGER.info("Persisting instance partitions: {} (referencing 
{})", instancePartitions,
+                  referenceInstancePartitionsName);
+              
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+                  instancePartitions);
+            }
+          }
         }
         return Pair.of(instancePartitions, instancePartitionsUnchanged);
       } else {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 4335d80b14..b25a529e10 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
+import java.util.Set;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
@@ -42,9 +50,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 
 public class InstanceAssignmentTest {
@@ -54,6 +60,7 @@ public class InstanceAssignmentTest {
   private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
   private static final String SERVER_INSTANCE_POOL_PREFIX = "_pool_";
   private static final String TABLE_NAME_ZERO_HASH_COMPLEMENT = "12";
+  public static final Logger LOGGER = 
LogManager.getLogger(InstanceAssignmentTest.class);
 
   @Test
   public void testDefaultOfflineReplicaGroup() {
@@ -329,6 +336,957 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, 
SERVER_INSTANCE_ID_PREFIX + 0));
   }
 
+  public void testMirrorServerSetBasedRandom() throws FileNotFoundException {
+    testMirrorServerSetBasedRandomInner(10000000);
+  }
+
+  public void testMirrorServerSetBasedRandomInner(int loopCount) throws 
FileNotFoundException {
+    PrintStream o = new PrintStream("output.txt");
+    System.setOut(o);
+    for (int iter = 0; iter < loopCount; iter++) {
+      
System.out.printf("_____________________________ITERATION:%d________________________________%n",
 iter);
+      Random random1 = new Random();
+      int numTargetReplicaGroups = random1.nextInt(7) + 1;
+      int numExistingReplicaGroups = random1.nextInt(7) + 1;
+      int numPreConfiguredInstancesPerReplicaGroup = random1.nextInt(10) + 5;
+      int numTargetInstancesPerReplicaGroup = 
Math.max(random1.nextInt(numPreConfiguredInstancesPerReplicaGroup), 5);
+      int numExistingInstancesPerReplicaGroup = 
Math.max(random1.nextInt(numPreConfiguredInstancesPerReplicaGroup), 5);
+      int numPools = random1.nextInt(10) + 1;
+
+      int numPartitions = 0;
+      int numInstancesPerPartition = 0;
+      List<InstanceConfig> instanceConfigs = new ArrayList<>();
+
+      int preConfiguredOffsetStart = random1.nextInt(10);
+      for (int i = 0; i < 1000; i++) {
+        int pool = i % numPools;
+        InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+        instanceConfig.addTag(OFFLINE_TAG);
+        instanceConfig.getRecord()
+            .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+        instanceConfigs.add(instanceConfig);
+      }
+      InstanceTagPoolConfig tagPoolConfig = new 
InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+      InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
+          new InstanceReplicaGroupPartitionConfig(true, 0, 
numTargetReplicaGroups, numTargetInstancesPerReplicaGroup,
+              numPartitions, numInstancesPerPartition, false, null);
+
+      TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+          
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+              new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                  
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+          
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured"))
+          .build();
+      InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+      InstancePartitions preConfigured = new 
InstancePartitions("preConfigured");
+      InstancePartitions existing = new InstancePartitions("existing");
+
+      List<String> preconfiguredInstances = new LinkedList<>();
+      List<String> existingInstances = new LinkedList<>();
+
+      Set<Integer> preConfiguredUsed = new HashSet<>();
+      Set<Integer> existingUsed = new HashSet<>();
+
+      for (int i = 0; i < numTargetReplicaGroups; i++) {
+        for (int j = 0; j < numPreConfiguredInstancesPerReplicaGroup; j++) {
+          int instance =
+              random1.nextInt((int) (1.5 * numTargetReplicaGroups * 
numPreConfiguredInstancesPerReplicaGroup));
+          while (preConfiguredUsed.contains(instance)) {
+            instance = random1.nextInt((int) (1.5 * numTargetReplicaGroups * 
numPreConfiguredInstancesPerReplicaGroup));
+          }
+          preConfiguredUsed.add(instance);
+          preconfiguredInstances.add(SERVER_INSTANCE_ID_PREFIX + (instance + 
preConfiguredOffsetStart));
+        }
+      }
+
+      for (int i = 0; i < numExistingReplicaGroups; i++) {
+        for (int j = 0; j < numExistingInstancesPerReplicaGroup; j++) {
+          int instance = random1.nextInt((int) (1.5 * numExistingReplicaGroups 
* numExistingInstancesPerReplicaGroup));
+          while (existingUsed.contains(instance)) {
+            instance = random1.nextInt((int) (1.5 * numExistingReplicaGroups * 
numExistingInstancesPerReplicaGroup));
+          }
+          existingUsed.add(instance);
+          existingInstances.add(SERVER_INSTANCE_ID_PREFIX + instance);
+        }
+      }
+
+      Collections.shuffle(preconfiguredInstances);
+      Collections.shuffle(existingInstances);
+
+      for (int i = 0; i < numTargetReplicaGroups; i++) {
+        preConfigured.setInstances(0, i, preconfiguredInstances.subList(i * 
numPreConfiguredInstancesPerReplicaGroup,
+            (i + 1) * numPreConfiguredInstancesPerReplicaGroup));
+      }
+
+      for (int i = 0; i < numExistingReplicaGroups; i++) {
+        existing.setInstances(0, i, existingInstances.subList(i * 
numExistingInstancesPerReplicaGroup,
+            (i + 1) * numExistingInstancesPerReplicaGroup));
+      }
+
+      System.out.println("Done initializing preconfigured and existing 
instances");
+      System.out.println("numTargetReplicaGroups " + numTargetReplicaGroups);
+      System.out.println("numPreConfiguredInstancesPerReplicaGroup " + 
numPreConfiguredInstancesPerReplicaGroup);
+      System.out.println("numTargetInstancesPerReplicaGroup " + 
numTargetInstancesPerReplicaGroup);
+
+      System.out.println("numExistingReplicaGroups " + 
numExistingReplicaGroups);
+      System.out.println("numExistingInstancesPerReplicaGroup " + 
numExistingInstancesPerReplicaGroup);
+      System.out.println("");
+      for (int i = 0; i < numTargetReplicaGroups; i++) {
+        System.out.println("Preconfigured instances for replica group " + i + 
" : " + preConfigured.getInstances(0, i));
+      }
+      System.out.println("");
+      for (int i = 0; i < numExistingReplicaGroups; i++) {
+        System.out.println("Existing instances for replica group " + i + " : " 
+ existing.getInstances(0, i));
+      }
+      System.out.println("");
+      InstancePartitions instancePartitions =
+          driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existing, preConfigured);
+      assertEquals(instancePartitions.getNumReplicaGroups(), 
numTargetReplicaGroups);
+      assertEquals(instancePartitions.getNumPartitions(), 1);
+
+      for (int i = 0; i < numTargetReplicaGroups; i++) {
+        System.out.println("Assigned instances for replica group " + i + " : " 
+ instancePartitions.getInstances(0, i));
+      }
+    }
+  }
+
+  @Test
+  public void testMirrorServerSetBased() {
+    LogManager.getLogger(MirrorServerSetInstancePartitionSelector.class)
+        .setLevel(Level.INFO);
+
+    // Test initial assignment 3 replica groups, 7 instances per rg.
+    int numPartitions = 0;
+    int numInstancesPerPartition = 0;
+    int numInstances = 21;
+    int numPools = 5;
+    int numReplicaGroups = 3;
+    int numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < 100; i++) {
+      int pool = i % numPools;
+      InstanceConfig instanceConfig =
+          new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+    InstanceTagPoolConfig tagPoolConfig = new 
InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+    InstancePartitions preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 15,
+            SERVER_INSTANCE_ID_PREFIX + 18));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13, 
SERVER_INSTANCE_ID_PREFIX + 16,
+            SERVER_INSTANCE_ID_PREFIX + 19));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14, 
SERVER_INSTANCE_ID_PREFIX + 17,
+            SERVER_INSTANCE_ID_PREFIX + 20));
+
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * Pre-configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  3      4      5
+     *   Host  6      7      8
+     *   Host  9      10     11
+     *   Host  12     13     14
+     *   Host  15     16     17
+     *   Host  18     19     20
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  3      4      5
+     *   Host  6      7      8
+     *   Host  9      10     11
+     *   Host  12     13     14
+     *   Host  15     16     17
+     *   Host  18     19     20
+     */
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 3,
+            SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 15,
+            SERVER_INSTANCE_ID_PREFIX + 18));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 13,
+            SERVER_INSTANCE_ID_PREFIX + 16,
+            SERVER_INSTANCE_ID_PREFIX + 19));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 14,
+            SERVER_INSTANCE_ID_PREFIX + 17,
+            SERVER_INSTANCE_ID_PREFIX + 20));
+
+    // Test instance shuffling/uplifting from 3*5 to 3*7
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 21;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 15,
+            SERVER_INSTANCE_ID_PREFIX + 18));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13, 
SERVER_INSTANCE_ID_PREFIX + 16,
+            SERVER_INSTANCE_ID_PREFIX + 19));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14, 
SERVER_INSTANCE_ID_PREFIX + 17,
+            SERVER_INSTANCE_ID_PREFIX + 20));
+
+    InstancePartitions existingInstancePartitions = new 
InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 12, SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX 
+ 7, SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 14, SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+            existingInstancePartitions, preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * uplift from 15 instances in 3 replicas to 21 instance in 3 replicas
+     * 21 instances in 4 pools
+     * Pre-configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  3      4      5
+     *   Host  6      7      8
+     *   Host  9      10     11
+     *   Host  12     13     14
+     *   Host  15     16     17
+     *   Host  18     19     20
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      6      2
+     *   Host  12     7      14
+     *   Host  1      4      5
+     *   Host  3      13     8
+     *   Host  9      10     11
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  12     13     14
+     *   Host  3      4      5
+     *   Host  6      7      8
+     *   Host  9      10     11
+     *   Host  15     16     17
+     *   Host  18     19     20
+     */
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 3,
+            SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 15,
+            SERVER_INSTANCE_ID_PREFIX + 18));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 13,
+            SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 16,
+            SERVER_INSTANCE_ID_PREFIX + 19));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 14,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 17,
+            SERVER_INSTANCE_ID_PREFIX + 20));
+
+    // Test instance replacement from 3*6 to 3*5
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 15;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, 
SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 24,
+            SERVER_INSTANCE_ID_PREFIX + 27, SERVER_INSTANCE_ID_PREFIX + 30));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 19, 
SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 25,
+            SERVER_INSTANCE_ID_PREFIX + 28, SERVER_INSTANCE_ID_PREFIX + 31));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 20, 
SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 26,
+            SERVER_INSTANCE_ID_PREFIX + 29, SERVER_INSTANCE_ID_PREFIX + 32));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 15, 
SERVER_INSTANCE_ID_PREFIX + 18));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 16, 
SERVER_INSTANCE_ID_PREFIX + 19));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17, 
SERVER_INSTANCE_ID_PREFIX + 20));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * From 18 instances in 3 replicas to 15 instance in 3 replicas
+     * Pre-configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  18     19     20
+     *   Host  21     22     23
+     *   Host  24     25     26
+     *   Host  27     28     29
+     *   Host  30     31     32
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  3      4      5
+     *   Host  6      7      8
+     *   Host  9      10     11
+     *   Host  15     16     17
+     *   Host  18     19     20
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  18     19     20
+     *   Host  21     22     23
+     *   Host  24     25     26
+     *   Host  27     28     29
+     *   Host  30     31     32
+     *
+     */
+
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+            SERVER_INSTANCE_ID_PREFIX + 21,
+            SERVER_INSTANCE_ID_PREFIX + 24,
+            SERVER_INSTANCE_ID_PREFIX + 27,
+            SERVER_INSTANCE_ID_PREFIX + 30));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 22,
+            SERVER_INSTANCE_ID_PREFIX + 25,
+            SERVER_INSTANCE_ID_PREFIX + 28,
+            SERVER_INSTANCE_ID_PREFIX + 31));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 20,
+            SERVER_INSTANCE_ID_PREFIX + 23,
+            SERVER_INSTANCE_ID_PREFIX + 26,
+            SERVER_INSTANCE_ID_PREFIX + 29,
+            SERVER_INSTANCE_ID_PREFIX + 32));
+
+    // Test instance shuffling/uplifting from 3*5 to 3*7, with some instance 
replacement
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 18;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 15, 
SERVER_INSTANCE_ID_PREFIX + 18));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 16, 
SERVER_INSTANCE_ID_PREFIX + 19));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17, 
SERVER_INSTANCE_ID_PREFIX + 20));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 12, SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX 
+ 7, SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 14, SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * uplift from 15 instances in 3 replicas to 21 instance in 3 replicas
+     * Pre-configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  3      4      5
+     *   Host  6      7      8
+     *   Host  9      10     11
+     *   Host  15     16     17
+     *   Host  18     19     20
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      6      2
+     *   Host  12     7      14
+     *   Host  1      4      5
+     *   Host  3      13     8
+     *   Host  9      10     11
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2
+     *   Host  6      7      8
+     *   Host  3      4      5
+     *   Host  15     16     17
+     *   Host  9      10     11
+     *   Host  18     19     20
+     */
+
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 3,
+            SERVER_INSTANCE_ID_PREFIX + 15,
+            SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 18));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 16,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 19));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 17,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 20));
+
+    // Test instance shuffling/uplifting from 3*5 to 4*6
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 24;
+    numReplicaGroups = 4;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 15));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13, 
SERVER_INSTANCE_ID_PREFIX + 16));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14, 
SERVER_INSTANCE_ID_PREFIX + 17));
+    preConfigured.setInstances(0, 3,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, 
SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 20,
+            SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 22, 
SERVER_INSTANCE_ID_PREFIX + 23));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 12, SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX 
+ 7, SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 14, SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * Test instance shuffling/uplifting from 3*5 to 4*6
+     * Pre-configured partitioning:
+     *         RG1    RG2    RG3    RG4
+     *   Host  0      1      2      18
+     *   Host  3      4      5      19
+     *   Host  6      7      8      20
+     *   Host  9      10     11     21
+     *   Host  12     13     14     22
+     *   Host  15     16     17     23
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  0      6      2
+     *   Host  12     7      14
+     *   Host  1      4      5
+     *   Host  3      13     8
+     *   Host  9      10     11
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  0      1      2      18
+     *   Host  12     13     14     22
+     *   Host  3      4      5      19
+     *   Host  6      7      8      20
+     *   Host  9      10     11     21
+     *   Host  15     16     17     23
+     */
+
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 3,
+            SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 15));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 13,
+            SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 16));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 14,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 17));
+    assertEquals(instancePartitions.getInstances(0, 3),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+            SERVER_INSTANCE_ID_PREFIX + 22,
+            SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 20,
+            SERVER_INSTANCE_ID_PREFIX + 21,
+            SERVER_INSTANCE_ID_PREFIX + 23));
+
+    // Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling 
of instances
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 12;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 14));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 22, SERVER_INSTANCE_ID_PREFIX + 13,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, 
SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 20,
+            SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 23));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 3,
+            SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 15));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 13,
+            SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 16));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 14,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 17));
+    existingInstancePartitions.setInstances(0, 3,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+            SERVER_INSTANCE_ID_PREFIX + 22,
+            SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 20,
+            SERVER_INSTANCE_ID_PREFIX + 21,
+            SERVER_INSTANCE_ID_PREFIX + 23));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling 
of instances
+     * Pre-configured partitioning:
+     *         RG2    RG3    RG4
+     *   Host  1      2      18
+     *   Host  4      22     19
+     *   Host  7      13     20
+     *   Host  10     11     21
+     *   Host  14     17     23
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3    RG4
+     *   Host  0      1      2      18
+     *   Host  3      4      5      19
+     *   Host  6      7      8      20
+     *   Host  9      10     11     21
+     *   Host  12     13     14     22
+     *   Host  15     16     17     23
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  1      2      18
+     *   Host  10     11     21
+     *   Host  7      13     20
+     *   Host  14     17     23
+     */
+
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 14));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13,
+            SERVER_INSTANCE_ID_PREFIX + 17));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+            SERVER_INSTANCE_ID_PREFIX + 21,
+            SERVER_INSTANCE_ID_PREFIX + 20,
+            SERVER_INSTANCE_ID_PREFIX + 23));
+
+
+    // upscale 3*3 to 3*5
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 15;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3));
+
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 6));
+
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 9));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    /*
+     * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling 
of instances
+     * Pre-configured partitioning:
+     *         RG2    RG3    RG4
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     *   Host  10     11     12
+     *   Host  13     14     15
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     *   Host  10     11     12
+     *   Host  13     14     15
+     */
+
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+            SERVER_INSTANCE_ID_PREFIX + 4,
+            SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10,
+            SERVER_INSTANCE_ID_PREFIX + 13));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 5,
+            SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 14));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3,
+            SERVER_INSTANCE_ID_PREFIX + 6,
+            SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 15));
+
+    // downscale 3*5 to 3*3
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 9;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+
+    /*
+     * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling 
of instances
+     * Pre-configured partitioning:
+     *         RG2    RG3    RG4
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     *
+     * Existing configured partitioning:
+     *         RG1    RG2    RG3
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     *   Host  10     11     12
+     *   Host  13     14     15
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     */
+
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // replace instance 5 with instance 11
+    numPartitions = 0;
+    numInstancesPerPartition = 0;
+    numInstances = 9;
+    numReplicaGroups = 3;
+    numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
+            numInstancesPerPartition, false, null);
+
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+        
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+
+    preConfigured = new InstancePartitions("preConfigured");
+    preConfigured.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+    preConfigured.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 11, SERVER_INSTANCE_ID_PREFIX + 8));
+    preConfigured.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    existingInstancePartitions = new InstancePartitions("existing");
+    existingInstancePartitions.setInstances(0, 0,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+    existingInstancePartitions.setInstances(0, 1,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 5, SERVER_INSTANCE_ID_PREFIX + 8));
+    existingInstancePartitions.setInstances(0, 2,
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions,
+            preConfigured);
+
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+
+    /*
+     * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling 
of instances
+     * Pre-configured partitioning:
+     *         RG2    RG3    RG4
+     *   Host  1      2      3
+     *   Host  4      11     6
+     *   Host  7      8      9
+     *
+     * Existing configured partitioning:
+     *         RG2    RG3    RG4
+     *   Host  1      2      3
+     *   Host  4      5      6
+     *   Host  7      8      9
+     *
+     * Final assignment for this table:
+     *         RG1    RG2    RG3
+     *   Host  1      2      3
+     *   Host  4      11      6
+     *   Host  7      8      9
+     */
+
+    // Verifying the final configuration after downlifting
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 11, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+  }
+
   @Test
   public void testPoolBased() {
     // 10 instances in 2 pools, each with 5 instances
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index db6941bf1f..8a22aaf4ea 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -85,6 +85,8 @@ public class Actions {
     public static final String UPDATE_USER = "UpdateUser";
     public static final String UPDATE_ZNODE = "UpdateZnode";
     public static final String UPLOAD_SEGMENT = "UploadSegment";
+    public static final String GET_INSTANCE_PARTITIONS = 
"GetInstancePartitions";
+    public static final String UPDATE_INSTANCE_PARTITIONS = 
"UpdateInstancePartitions";
   }
 
   // Action names for table
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 73cf7f27a1..b7d8cadc4a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -771,11 +771,25 @@ public final class TableConfigUtils {
         tableConfig.getInstanceAssignmentConfigMap())) {
       return;
     }
-    for (InstancePartitionsType instancePartitionsType : 
tableConfig.getInstancePartitionsMap().keySet()) {
-      Preconditions.checkState(
-          
!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()),
-          String.format("Both InstanceAssignmentConfigMap and 
InstancePartitionsMap set for %s",
-              instancePartitionsType));
+
+    for (InstancePartitionsType instancePartitionsType : 
InstancePartitionsType.values()) {
+      if 
(tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()))
 {
+        InstanceAssignmentConfig instanceAssignmentConfig =
+            
tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString());
+        if (instanceAssignmentConfig.getPartitionSelector()
+            == 
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR)
 {
+          Preconditions.checkState(
+              
tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType),
+              String.format("Both InstanceAssignmentConfigMap and 
InstancePartitionsMap needed for %s, as "
+                      + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used",
+                  instancePartitionsType));
+        } else {
+          Preconditions.checkState(
+              
!tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType),
+              String.format("Both InstanceAssignmentConfigMap and 
InstancePartitionsMap set for %s",
+                  instancePartitionsType));
+        }
+      }
     }
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index 186f545cea..391ba4812d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -82,6 +82,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
   }
 
   public enum PartitionSelector {
-    FD_AWARE_INSTANCE_PARTITION_SELECTOR, 
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR
+    FD_AWARE_INSTANCE_PARTITION_SELECTOR, 
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
+    MIRROR_SERVER_SET_PARTITION_SELECTOR
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to