This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a39165  StreamPartitionAssignmentStrategy interface to plug in 
various partition assignment strategies (uniform, replica group based) (#4211)
7a39165 is described below

commit 7a391659faf15010b9594886a281592886ca1d62
Author: Neha Pawar <[email protected]>
AuthorDate: Mon May 20 14:37:11 2019 -0700

    StreamPartitionAssignmentStrategy interface to plug in various partition 
assignment strategies (uniform, replica group based) (#4211)
    
    Phase 1 - Step 2 of https://github.com/apache/incubator-pinot/issues/4192
---
 ...roupBasedStreamPartitionAssignmentStrategy.java |  82 ++++++++++++
 .../StreamPartitionAssignmentGenerator.java        |  51 +-------
 .../StreamPartitionAssignmentStrategy.java         |  39 ++++++
 .../StreamPartitionAssignmentStrategyFactory.java  |  46 +++++++
 .../UniformStreamPartitionAssignmentStrategy.java  |  66 ++++++++++
 ...icaGroupBasedStreamPartitionAssignmentTest.java | 140 +++++++++++++++++++++
 .../StreamPartitionAssignmentGeneratorTest.java    |  45 ++++++-
 .../UniformStreamPartitionAssignmentTest.java      |  94 ++++++++++++++
 .../ReplicaGroupRebalanceSegmentStrategy.java      |  11 +-
 9 files changed, 518 insertions(+), 56 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentStrategy.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentStrategy.java
new file mode 100644
index 0000000..43fe69a
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentStrategy.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+
+
+/**
+ * Replica group based partition assignment strategy for realtime partitions
+ */
+public class ReplicaGroupBasedStreamPartitionAssignmentStrategy implements 
StreamPartitionAssignmentStrategy {
+
+  /**
+   * Fetches the replica group partition assignment znode and assigns 
partitions across the replica groups.
+   * A vertical slice will be picked form the replica group sets for a 
partition, based on the formula:
+   * vertical slice = partition % numInstancesPerReplicaGroup
+   */
+  @Override
+  public PartitionAssignment getStreamPartitionAssignment(HelixManager 
helixManager, @Nonnull String tableNameWithType,
+      @Nonnull List<String> partitions, int numReplicas, List<String> 
instances) throws InvalidConfigException {
+
+    ReplicaGroupPartitionAssignment replicaGroupPartitionAssignment =
+        getReplicaGroupPartitionAssignment(helixManager, tableNameWithType);
+    if (replicaGroupPartitionAssignment == null) {
+      throw new InvalidConfigException("ReplicaGroupPartitionAssignment is 
null for table:" + tableNameWithType);
+    }
+    int numReplicaGroups = 
replicaGroupPartitionAssignment.getNumReplicaGroups();
+    // TODO: we might move to a model of not having the same 
numInstancesPerReplicaGroup.
+    // We would have to handle numInstancesInReplicaGroup on a replica group 
by replica group basis, and uniformly assign withing each replica group
+    int numInstancesPerReplicaGroup = 
replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(0, 0).size();
+
+    PartitionAssignment streamPartitionAssignment = new 
PartitionAssignment(tableNameWithType);
+
+    List<List<String>> verticalSlices = new 
ArrayList<>(numInstancesPerReplicaGroup);
+    for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+      verticalSlices.add(new ArrayList<>(numReplicaGroups));
+    }
+
+    for (int replicaGroupNumber = 0; replicaGroupNumber < numReplicaGroups; 
replicaGroupNumber++) {
+      List<String> instancesFromReplicaGroup =
+          replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(0, 
replicaGroupNumber);
+      for (int serverIndex = 0; serverIndex < numInstancesPerReplicaGroup; 
serverIndex++) {
+        
verticalSlices.get(serverIndex).add(instancesFromReplicaGroup.get(serverIndex));
+      }
+    }
+
+    for (String partition : partitions) {
+      int verticalSlice = Integer.parseInt(partition) % 
numInstancesPerReplicaGroup;
+      streamPartitionAssignment.addPartition(partition, 
verticalSlices.get(verticalSlice));
+    }
+    return streamPartitionAssignment;
+  }
+
+  @VisibleForTesting
+  protected ReplicaGroupPartitionAssignment 
getReplicaGroupPartitionAssignment(HelixManager helixManager,
+      String tableNameWithType) {
+    ReplicaGroupPartitionAssignmentGenerator 
replicaGroupPartitionAssignmentGenerator =
+        new 
ReplicaGroupPartitionAssignmentGenerator(helixManager.getHelixPropertyStore());
+    return 
replicaGroupPartitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
index 888fbda..0844e24 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.common.partition;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -32,7 +31,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
-import org.apache.pinot.common.utils.EqualityUtils;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 
@@ -71,8 +69,6 @@ public class StreamPartitionAssignmentGenerator {
 
   /**
    * Generates a map of partition id to latest llc segment
-   * @param idealState
-   * @return
    */
   @VisibleForTesting
   public Map<String, LLCSegmentName> getPartitionToLatestSegments(IdealState 
idealState) {
@@ -114,8 +110,6 @@ public class StreamPartitionAssignmentGenerator {
   public PartitionAssignment generateStreamPartitionAssignment(TableConfig 
tableConfig, int numPartitions)
       throws InvalidConfigException {
 
-    // TODO: add an override which can read from znode, instead of generating 
on the fly
-
     List<String> partitions = new ArrayList<>(numPartitions);
     for (int i = 0; i < numPartitions; i++) {
       partitions.add(String.valueOf(i));
@@ -123,49 +117,12 @@ public class StreamPartitionAssignmentGenerator {
 
     String tableNameWithType = tableConfig.getTableName();
     int numReplicas = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-
     List<String> consumingTaggedInstances = 
getConsumingTaggedInstances(tableConfig);
-    if (consumingTaggedInstances.size() < numReplicas) {
-      throw new InvalidConfigException(
-          "Not enough consuming instances tagged. Must be atleast equal to 
numReplicas:" + numReplicas);
-    }
 
-    /**
-     * TODO: We will use only uniform assignment for now
-     * This will be refactored as AssignmentStrategy interface and 
implementations UniformAssignment, BalancedAssignment etc
-     * {@link StreamPartitionAssignmentGenerator} and AssignmentStrategy 
interface will together replace
-     * StreamPartitionAssignmentGenerator and StreamPartitionAssignmentStrategy
-     */
-    return uniformAssignment(tableNameWithType, partitions, numReplicas, 
consumingTaggedInstances);
-  }
-
-  /**
-   * Uniformly sprays the partitions and replicas across given list of 
instances
-   * Picks starting point based on table hash value. This ensures that we will 
always pick the same starting point,
-   * and return consistent assignment across calls
-   * @param allInstances
-   * @param partitions
-   * @param numReplicas
-   * @return
-   */
-  private PartitionAssignment uniformAssignment(String tableName, List<String> 
partitions, int numReplicas,
-      List<String> allInstances) {
-
-    PartitionAssignment partitionAssignment = new 
PartitionAssignment(tableName);
-
-    Collections.sort(allInstances);
-
-    int numInstances = allInstances.size();
-    int serverId = Math.abs(EqualityUtils.hashCodeOf(tableName)) % 
numInstances;
-    for (String partition : partitions) {
-      List<String> instances = new ArrayList<>(numReplicas);
-      for (int r = 0; r < numReplicas; r++) {
-        instances.add(allInstances.get(serverId));
-        serverId = (serverId + 1) % numInstances;
-      }
-      partitionAssignment.addPartition(partition, instances);
-    }
-    return partitionAssignment;
+    StreamPartitionAssignmentStrategy streamPartitionAssignmentStrategy =
+        
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+    return 
streamPartitionAssignmentStrategy.getStreamPartitionAssignment(_helixManager, 
tableNameWithType, partitions,
+        numReplicas, consumingTaggedInstances);
   }
 
   @VisibleForTesting
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategy.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategy.java
new file mode 100644
index 0000000..ea12517
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+
+
+/**
+ * Creates a partition assignment for the partitions of a realtime table
+ */
+// TODO: Unify the interfaces for {@link 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy} and 
{@link StreamPartitionAssignmentStrategy}
+public interface StreamPartitionAssignmentStrategy {
+
+  /**
+   * Given the list of partitions and replicas, come up with a {@link 
PartitionAssignment}
+   */
+  // TODO: pass current partition assignment to add smarts which can minimize 
shuffle
+  PartitionAssignment getStreamPartitionAssignment(HelixManager helixManager, 
@Nonnull String tableNameWithType,
+      @Nonnull List<String> partitions, int numReplicas, List<String> 
instances) throws InvalidConfigException;
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategyFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategyFactory.java
new file mode 100644
index 0000000..6a10ced
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategyFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType;
+
+
+/**
+ * Factory class for constructing the right {@link 
StreamPartitionAssignmentStrategy} from the table config
+ */
+public class StreamPartitionAssignmentStrategyFactory {
+
+  /**
+   * Given a table config, get the {@link StreamPartitionAssignmentStrategy}
+   */
+  static StreamPartitionAssignmentStrategy 
getStreamPartitionAssignmentStrategy(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+    if (validationConfig != null) {
+      String segmentAssignmentStrategy = 
validationConfig.getSegmentAssignmentStrategy();
+      if (segmentAssignmentStrategy != null
+          && 
SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy.toString()
+          .equalsIgnoreCase(segmentAssignmentStrategy)) {
+        return new ReplicaGroupBasedStreamPartitionAssignmentStrategy();
+      }
+    }
+    return new UniformStreamPartitionAssignmentStrategy();
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentStrategy.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentStrategy.java
new file mode 100644
index 0000000..5ab10e0
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentStrategy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.utils.EqualityUtils;
+
+
+/**
+ * Uniform partition assignment strategy implementation which uniformly sprays 
partitions across available hosts
+ */
+public class UniformStreamPartitionAssignmentStrategy implements 
StreamPartitionAssignmentStrategy {
+
+  /**
+   * Uniformly sprays the partitions and replicas across given list of 
instances
+   * Picks starting point based on table hash value. This ensures that we will 
always pick the same starting point,
+   * and return consistent assignment across calls
+   */
+  @Override
+  public PartitionAssignment getStreamPartitionAssignment(HelixManager 
helixManager, @Nonnull String tableNameWithType,
+      @Nonnull List<String> partitions, int numReplicas, List<String> 
allTaggedInstances)
+      throws InvalidConfigException {
+
+    if (allTaggedInstances.size() < numReplicas) {
+      throw new InvalidConfigException(
+          "Not enough consuming instances tagged for 
UniformStreamPartitionAssignment. Must be at least equal to numReplicas:"
+              + numReplicas);
+    }
+
+    PartitionAssignment partitionAssignment = new 
PartitionAssignment(tableNameWithType);
+
+    Collections.sort(allTaggedInstances);
+    int numInstances = allTaggedInstances.size();
+    int serverId = Math.abs(EqualityUtils.hashCodeOf(tableNameWithType)) % 
numInstances;
+    for (String partition : partitions) {
+      List<String> instances = new ArrayList<>(numReplicas);
+      for (int r = 0; r < numReplicas; r++) {
+        instances.add(allTaggedInstances.get(serverId));
+        serverId = (serverId + 1) % numInstances;
+      }
+      partitionAssignment.addPartition(partition, instances);
+    }
+    return partitionAssignment;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentTest.java
new file mode 100644
index 0000000..aa4c7ec
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link ReplicaGroupBasedStreamPartitionAssignmentStrategy}
+ */
+public class ReplicaGroupBasedStreamPartitionAssignmentTest {
+
+  @Test
+  public void testReplicaGroupBasedStreamPartitionAssignmentStrategy() throws 
InvalidConfigException {
+    MockReplicaGroupBasedStreamPartitionAssignmentStrategy 
mockStreamPartitionAssignmentStrategy =
+        new MockReplicaGroupBasedStreamPartitionAssignmentStrategy();
+
+    // Realtime table with 2 replicas, 6 partitions, 4 
numInstancesPerReplicaGroup.
+    // 8 servers distributed across 2 replica groups into 2 sets of 4.
+    String tableNameWithType = "tableName_REALTIME";
+    List<String> partitions = Lists.newArrayList("0", "1", "2", "3", "4", "5");
+    int numReplicas = 2;
+    List<String> allTaggedInstances =
+        Lists.newArrayList("server_1", "server_2", "server_3", "server_4", 
"server_5", "server_6", "server_7",
+            "server_8");
+
+    ReplicaGroupPartitionAssignment replicaGroupPartitionAssignment =
+        new ReplicaGroupPartitionAssignment(tableNameWithType);
+    replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 0,
+        Lists.newArrayList("server_1", "server_2", "server_3", "server_4"));
+    replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 1,
+        Lists.newArrayList("server_5", "server_6", "server_7", "server_8"));
+
+    // null replica group partition assignment
+    
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(null);
+    boolean exception = false;
+    try {
+      mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType, partitions,
+          numReplicas, allTaggedInstances);
+    } catch (InvalidConfigException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    // mismatch between numReplicas and numReplicaGroups - follow the replica 
group assignment
+    
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(replicaGroupPartitionAssignment);
+    PartitionAssignment streamPartitionAssignment =
+        
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType, partitions, 5,
+            allTaggedInstances);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("0"),
+        Lists.newArrayList("server_1", "server_5"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("1"),
+        Lists.newArrayList("server_2", "server_6"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("2"),
+        Lists.newArrayList("server_3", "server_7"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("3"),
+        Lists.newArrayList("server_4", "server_8"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("4"),
+        Lists.newArrayList("server_1", "server_5"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("5"),
+        Lists.newArrayList("server_2", "server_6"));
+
+    // happy path - correctly generated partition assignment
+    
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(replicaGroupPartitionAssignment);
+    streamPartitionAssignment =
+        
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType, partitions,
+            numReplicas, allTaggedInstances);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("0"),
+        Lists.newArrayList("server_1", "server_5"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("1"),
+        Lists.newArrayList("server_2", "server_6"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("2"),
+        Lists.newArrayList("server_3", "server_7"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("3"),
+        Lists.newArrayList("server_4", "server_8"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("4"),
+        Lists.newArrayList("server_1", "server_5"));
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("5"),
+        Lists.newArrayList("server_2", "server_6"));
+
+    // 0 partitions
+    streamPartitionAssignment =
+        
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType,
+            Collections.emptyList(), numReplicas, allTaggedInstances);
+    Assert.assertEquals(streamPartitionAssignment.getNumPartitions(), 0);
+
+    // only 1 instance per replica group
+    replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 0, 
Lists.newArrayList("server_1"));
+    replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 1, 
Lists.newArrayList("server_2"));
+    
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(replicaGroupPartitionAssignment);
+    streamPartitionAssignment =
+        
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType, partitions,
+            numReplicas, allTaggedInstances);
+    ArrayList<String> verticalSlice = Lists.newArrayList("server_1", 
"server_2");
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("0"),
 verticalSlice);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("1"),
 verticalSlice);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("2"),
 verticalSlice);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("3"),
 verticalSlice);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("4"),
 verticalSlice);
+    
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("5"),
 verticalSlice);
+  }
+
+  private class MockReplicaGroupBasedStreamPartitionAssignmentStrategy
+      extends ReplicaGroupBasedStreamPartitionAssignmentStrategy {
+    private ReplicaGroupPartitionAssignment _replicaGroupPartitionAssignment;
+
+    @Override
+    protected ReplicaGroupPartitionAssignment 
getReplicaGroupPartitionAssignment(HelixManager helixManager,
+        String tableNameWithType) {
+      return _replicaGroupPartitionAssignment;
+    }
+
+    void setReplicaGroupPartitionAssignment(ReplicaGroupPartitionAssignment 
replicaGroupPartitionAssignment) {
+      _replicaGroupPartitionAssignment = replicaGroupPartitionAssignment;
+    }
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
index 2ac46cc..4562418 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TenantConfig;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -64,6 +65,46 @@ public class StreamPartitionAssignmentGeneratorTest {
     }
   }
 
+  @Test
+  public void testStreamPartitionAssignmentStrategyFactory() {
+    TableConfig tableConfig = mock(TableConfig.class);
+
+    // null validation config
+    StreamPartitionAssignmentStrategy streamPartitionAssignmentStrategy =
+        
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+    Assert.assertTrue(streamPartitionAssignmentStrategy instanceof 
UniformStreamPartitionAssignmentStrategy);
+
+    // null segment assignment strategy
+    SegmentsValidationAndRetentionConfig mockValidationConfig = 
mock(SegmentsValidationAndRetentionConfig.class);
+    when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+    streamPartitionAssignmentStrategy =
+        
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+    Assert.assertTrue(streamPartitionAssignmentStrategy instanceof 
UniformStreamPartitionAssignmentStrategy);
+
+    // incorrect segment assignment strategy
+    
when(mockValidationConfig.getSegmentAssignmentStrategy()).thenReturn("someThing 
Wrong");
+    when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+    streamPartitionAssignmentStrategy =
+        
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+    Assert.assertTrue(streamPartitionAssignmentStrategy instanceof 
UniformStreamPartitionAssignmentStrategy);
+
+    // Unsupported type
+    when(mockValidationConfig.getSegmentAssignmentStrategy()).thenReturn(
+        
CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.BalanceNumSegmentAssignmentStrategy.toString());
+    when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+    streamPartitionAssignmentStrategy =
+        
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+    Assert.assertTrue(streamPartitionAssignmentStrategy instanceof 
UniformStreamPartitionAssignmentStrategy);
+
+    // ReplicaGroup
+    when(mockValidationConfig.getSegmentAssignmentStrategy()).thenReturn(
+        
CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy.toString());
+    when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+    streamPartitionAssignmentStrategy =
+        
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+    Assert.assertTrue(streamPartitionAssignmentStrategy instanceof 
ReplicaGroupBasedStreamPartitionAssignmentStrategy);
+  }
+
   /**
    * Given an ideal state, constructs the partition assignment for the table
    */
@@ -286,7 +327,7 @@ public class StreamPartitionAssignmentGeneratorTest {
     int serverId = 0;
     for (int p = 0; p < partitionAssignment.getNumPartitions(); p++) {
       for (String instance : 
partitionAssignment.getInstancesListForPartition(String.valueOf(p))) {
-        Assert.assertTrue(instance.equals(instancesUsed.get(serverId++)),
+        Assert.assertEquals(instancesUsed.get(serverId++), instance,
             "Uniform strategy test failed for table " + tableName);
         if (serverId == instancesUsed.size()) {
           serverId = 0;
@@ -305,7 +346,7 @@ public class StreamPartitionAssignmentGeneratorTest {
 
     private List<String> _consumingInstances;
 
-    public TestStreamPartitionAssignmentGenerator(HelixManager helixManager) {
+    TestStreamPartitionAssignmentGenerator(HelixManager helixManager) {
       super(helixManager);
       _consumingInstances = new ArrayList<>();
     }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentTest.java
new file mode 100644
index 0000000..5bb3da6
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link UniformStreamPartitionAssignmentStrategy}
+ */
+public class UniformStreamPartitionAssignmentTest {
+
+  /**
+   * Tests that we validate necessary variables and honor the uniform sticky 
assignment
+   * @throws InvalidConfigException
+   */
+  @Test
+  public void testUniformStreamPartitionAssignmentStrategy() throws 
InvalidConfigException {
+    UniformStreamPartitionAssignmentStrategy 
uniformStreamPartitionAssignmentStrategy =
+        new UniformStreamPartitionAssignmentStrategy();
+
+    String tableNameWithType = "tableName_REALTIME";
+    List<String> partitions = Lists.newArrayList("0", "1", "2", "3", "4", "5");
+    int numReplicas = 3;
+    List<String> allTaggedInstances =
+        Lists.newArrayList("server_1", "server_2", "server_3", "server_4", 
"server_5", "server_6", "server_7",
+            "server_8");
+
+    // num replicas more than tagged instances
+    boolean exception = false;
+    try {
+      
uniformStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType, partitions, 10,
+          allTaggedInstances);
+    } catch (InvalidConfigException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    // 0 partitions
+    PartitionAssignment uniformPartitionAssignment =
+        
uniformStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType,
+            Collections.emptyList(), numReplicas, allTaggedInstances);
+    Assert.assertEquals(uniformPartitionAssignment.getNumPartitions(), 0);
+
+    // verify sticky uniform assignment
+    uniformPartitionAssignment =
+        
uniformStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null, 
tableNameWithType, partitions,
+            numReplicas, allTaggedInstances);
+
+    List<String> instancesUsed = new ArrayList<>();
+    for (int p = 0; p < uniformPartitionAssignment.getNumPartitions(); p++) {
+      for (String instance : 
uniformPartitionAssignment.getInstancesListForPartition(String.valueOf(p))) {
+        if (!instancesUsed.contains(instance)) {
+          instancesUsed.add(instance);
+        }
+      }
+    }
+    Assert.assertTrue(instancesUsed.containsAll(allTaggedInstances));
+
+    int serverIndex = 0;
+    for (int p = 0; p < uniformPartitionAssignment.getNumPartitions(); p++) {
+      List<String> instancesListForPartition =
+          
uniformPartitionAssignment.getInstancesListForPartition(String.valueOf(p));
+      for (String instance : instancesListForPartition) {
+        Assert.assertEquals(instance, instancesUsed.get(serverIndex++));
+        if (serverIndex == instancesUsed.size()) {
+          serverIndex = 0;
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
index 9b7d8e3..a0f1985 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
@@ -392,8 +392,8 @@ public class ReplicaGroupRebalanceSegmentStrategy 
implements RebalanceSegmentStr
     }
 
     // Update Idealstate with rebalanced segment assignment
-    Map<String, Map<String, String>> serverToSegmentsMapping = 
buildSegmentToServerMapping(serverToSegments);
-    for (Map.Entry<String, Map<String, String>> entry : 
serverToSegmentsMapping.entrySet()) {
+    Map<String, Map<String, String>> newSegmentToServersMap = 
buildSegmentToServerMapping(serverToSegments);
+    for (Map.Entry<String, Map<String, String>> entry : 
newSegmentToServersMap.entrySet()) {
       idealState.setInstanceStateMap(entry.getKey(), entry.getValue());
     }
     idealState.setReplicas(Integer.toString(numReplicaGroups));
@@ -519,11 +519,8 @@ public class ReplicaGroupRebalanceSegmentStrategy 
implements RebalanceSegmentStr
     for (Map.Entry<String, LinkedList<String>> entry : 
serverToSegments.entrySet()) {
       String server = entry.getKey();
       for (String segment : entry.getValue()) {
-        if (!segmentsToServerMapping.containsKey(segment)) {
-          segmentsToServerMapping.put(segment, new HashMap<String, String>());
-        }
-        segmentsToServerMapping.get(segment)
-            .put(server, 
CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE);
+        Map<String, String> serverToStateMap = 
segmentsToServerMapping.computeIfAbsent(segment, k -> new HashMap<>());
+        serverToStateMap.put(server, 
CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE);
       }
     }
     return segmentsToServerMapping;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to