This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7a39165 StreamPartitionAssignmentStrategy interface to plug in
various partition assignment strategies (uniform, replica group based) (#4211)
7a39165 is described below
commit 7a391659faf15010b9594886a281592886ca1d62
Author: Neha Pawar <[email protected]>
AuthorDate: Mon May 20 14:37:11 2019 -0700
StreamPartitionAssignmentStrategy interface to plug in various partition
assignment strategies (uniform, replica group based) (#4211)
Phase 1 - Step 2 of https://github.com/apache/incubator-pinot/issues/4192
---
...roupBasedStreamPartitionAssignmentStrategy.java | 82 ++++++++++++
.../StreamPartitionAssignmentGenerator.java | 51 +-------
.../StreamPartitionAssignmentStrategy.java | 39 ++++++
.../StreamPartitionAssignmentStrategyFactory.java | 46 +++++++
.../UniformStreamPartitionAssignmentStrategy.java | 66 ++++++++++
...icaGroupBasedStreamPartitionAssignmentTest.java | 140 +++++++++++++++++++++
.../StreamPartitionAssignmentGeneratorTest.java | 45 ++++++-
.../UniformStreamPartitionAssignmentTest.java | 94 ++++++++++++++
.../ReplicaGroupRebalanceSegmentStrategy.java | 11 +-
9 files changed, 518 insertions(+), 56 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentStrategy.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentStrategy.java
new file mode 100644
index 0000000..43fe69a
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentStrategy.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+
+
+/**
+ * Replica group based partition assignment strategy for realtime partitions
+ */
+public class ReplicaGroupBasedStreamPartitionAssignmentStrategy implements
StreamPartitionAssignmentStrategy {
+
+ /**
+ * Fetches the replica group partition assignment znode and assigns
partitions across the replica groups.
+ * A vertical slice will be picked form the replica group sets for a
partition, based on the formula:
+ * vertical slice = partition % numInstancesPerReplicaGroup
+ */
+ @Override
+ public PartitionAssignment getStreamPartitionAssignment(HelixManager
helixManager, @Nonnull String tableNameWithType,
+ @Nonnull List<String> partitions, int numReplicas, List<String>
instances) throws InvalidConfigException {
+
+ ReplicaGroupPartitionAssignment replicaGroupPartitionAssignment =
+ getReplicaGroupPartitionAssignment(helixManager, tableNameWithType);
+ if (replicaGroupPartitionAssignment == null) {
+ throw new InvalidConfigException("ReplicaGroupPartitionAssignment is
null for table:" + tableNameWithType);
+ }
+ int numReplicaGroups =
replicaGroupPartitionAssignment.getNumReplicaGroups();
+ // TODO: we might move to a model of not having the same
numInstancesPerReplicaGroup.
+ // We would have to handle numInstancesInReplicaGroup on a replica group
by replica group basis, and uniformly assign withing each replica group
+ int numInstancesPerReplicaGroup =
replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(0, 0).size();
+
+ PartitionAssignment streamPartitionAssignment = new
PartitionAssignment(tableNameWithType);
+
+ List<List<String>> verticalSlices = new
ArrayList<>(numInstancesPerReplicaGroup);
+ for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+ verticalSlices.add(new ArrayList<>(numReplicaGroups));
+ }
+
+ for (int replicaGroupNumber = 0; replicaGroupNumber < numReplicaGroups;
replicaGroupNumber++) {
+ List<String> instancesFromReplicaGroup =
+ replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(0,
replicaGroupNumber);
+ for (int serverIndex = 0; serverIndex < numInstancesPerReplicaGroup;
serverIndex++) {
+
verticalSlices.get(serverIndex).add(instancesFromReplicaGroup.get(serverIndex));
+ }
+ }
+
+ for (String partition : partitions) {
+ int verticalSlice = Integer.parseInt(partition) %
numInstancesPerReplicaGroup;
+ streamPartitionAssignment.addPartition(partition,
verticalSlices.get(verticalSlice));
+ }
+ return streamPartitionAssignment;
+ }
+
+ @VisibleForTesting
+ protected ReplicaGroupPartitionAssignment
getReplicaGroupPartitionAssignment(HelixManager helixManager,
+ String tableNameWithType) {
+ ReplicaGroupPartitionAssignmentGenerator
replicaGroupPartitionAssignmentGenerator =
+ new
ReplicaGroupPartitionAssignmentGenerator(helixManager.getHelixPropertyStore());
+ return
replicaGroupPartitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
index 888fbda..0844e24 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGenerator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.common.partition;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -32,7 +31,6 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.RealtimeTagConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.exception.InvalidConfigException;
-import org.apache.pinot.common.utils.EqualityUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -71,8 +69,6 @@ public class StreamPartitionAssignmentGenerator {
/**
* Generates a map of partition id to latest llc segment
- * @param idealState
- * @return
*/
@VisibleForTesting
public Map<String, LLCSegmentName> getPartitionToLatestSegments(IdealState
idealState) {
@@ -114,8 +110,6 @@ public class StreamPartitionAssignmentGenerator {
public PartitionAssignment generateStreamPartitionAssignment(TableConfig
tableConfig, int numPartitions)
throws InvalidConfigException {
- // TODO: add an override which can read from znode, instead of generating
on the fly
-
List<String> partitions = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
partitions.add(String.valueOf(i));
@@ -123,49 +117,12 @@ public class StreamPartitionAssignmentGenerator {
String tableNameWithType = tableConfig.getTableName();
int numReplicas =
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-
List<String> consumingTaggedInstances =
getConsumingTaggedInstances(tableConfig);
- if (consumingTaggedInstances.size() < numReplicas) {
- throw new InvalidConfigException(
- "Not enough consuming instances tagged. Must be atleast equal to
numReplicas:" + numReplicas);
- }
- /**
- * TODO: We will use only uniform assignment for now
- * This will be refactored as AssignmentStrategy interface and
implementations UniformAssignment, BalancedAssignment etc
- * {@link StreamPartitionAssignmentGenerator} and AssignmentStrategy
interface will together replace
- * StreamPartitionAssignmentGenerator and StreamPartitionAssignmentStrategy
- */
- return uniformAssignment(tableNameWithType, partitions, numReplicas,
consumingTaggedInstances);
- }
-
- /**
- * Uniformly sprays the partitions and replicas across given list of
instances
- * Picks starting point based on table hash value. This ensures that we will
always pick the same starting point,
- * and return consistent assignment across calls
- * @param allInstances
- * @param partitions
- * @param numReplicas
- * @return
- */
- private PartitionAssignment uniformAssignment(String tableName, List<String>
partitions, int numReplicas,
- List<String> allInstances) {
-
- PartitionAssignment partitionAssignment = new
PartitionAssignment(tableName);
-
- Collections.sort(allInstances);
-
- int numInstances = allInstances.size();
- int serverId = Math.abs(EqualityUtils.hashCodeOf(tableName)) %
numInstances;
- for (String partition : partitions) {
- List<String> instances = new ArrayList<>(numReplicas);
- for (int r = 0; r < numReplicas; r++) {
- instances.add(allInstances.get(serverId));
- serverId = (serverId + 1) % numInstances;
- }
- partitionAssignment.addPartition(partition, instances);
- }
- return partitionAssignment;
+ StreamPartitionAssignmentStrategy streamPartitionAssignmentStrategy =
+
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+ return
streamPartitionAssignmentStrategy.getStreamPartitionAssignment(_helixManager,
tableNameWithType, partitions,
+ numReplicas, consumingTaggedInstances);
}
@VisibleForTesting
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategy.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategy.java
new file mode 100644
index 0000000..ea12517
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+
+
+/**
+ * Creates a partition assignment for the partitions of a realtime table
+ */
+// TODO: Unify the interfaces for {@link
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy} and
{@link StreamPartitionAssignmentStrategy}
+public interface StreamPartitionAssignmentStrategy {
+
+ /**
+ * Given the list of partitions and replicas, come up with a {@link
PartitionAssignment}
+ */
+ // TODO: pass current partition assignment to add smarts which can minimize
shuffle
+ PartitionAssignment getStreamPartitionAssignment(HelixManager helixManager,
@Nonnull String tableNameWithType,
+ @Nonnull List<String> partitions, int numReplicas, List<String>
instances) throws InvalidConfigException;
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategyFactory.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategyFactory.java
new file mode 100644
index 0000000..6a10ced
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/StreamPartitionAssignmentStrategyFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType;
+
+
+/**
+ * Factory class for constructing the right {@link
StreamPartitionAssignmentStrategy} from the table config
+ */
+public class StreamPartitionAssignmentStrategyFactory {
+
+ /**
+ * Given a table config, get the {@link StreamPartitionAssignmentStrategy}
+ */
+ static StreamPartitionAssignmentStrategy
getStreamPartitionAssignmentStrategy(TableConfig tableConfig) {
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ if (validationConfig != null) {
+ String segmentAssignmentStrategy =
validationConfig.getSegmentAssignmentStrategy();
+ if (segmentAssignmentStrategy != null
+ &&
SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy.toString()
+ .equalsIgnoreCase(segmentAssignmentStrategy)) {
+ return new ReplicaGroupBasedStreamPartitionAssignmentStrategy();
+ }
+ }
+ return new UniformStreamPartitionAssignmentStrategy();
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentStrategy.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentStrategy.java
new file mode 100644
index 0000000..5ab10e0
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentStrategy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.utils.EqualityUtils;
+
+
+/**
+ * Uniform partition assignment strategy implementation which uniformly sprays
partitions across available hosts
+ */
+public class UniformStreamPartitionAssignmentStrategy implements
StreamPartitionAssignmentStrategy {
+
+ /**
+ * Uniformly sprays the partitions and replicas across given list of
instances
+ * Picks starting point based on table hash value. This ensures that we will
always pick the same starting point,
+ * and return consistent assignment across calls
+ */
+ @Override
+ public PartitionAssignment getStreamPartitionAssignment(HelixManager
helixManager, @Nonnull String tableNameWithType,
+ @Nonnull List<String> partitions, int numReplicas, List<String>
allTaggedInstances)
+ throws InvalidConfigException {
+
+ if (allTaggedInstances.size() < numReplicas) {
+ throw new InvalidConfigException(
+ "Not enough consuming instances tagged for
UniformStreamPartitionAssignment. Must be at least equal to numReplicas:"
+ + numReplicas);
+ }
+
+ PartitionAssignment partitionAssignment = new
PartitionAssignment(tableNameWithType);
+
+ Collections.sort(allTaggedInstances);
+ int numInstances = allTaggedInstances.size();
+ int serverId = Math.abs(EqualityUtils.hashCodeOf(tableNameWithType)) %
numInstances;
+ for (String partition : partitions) {
+ List<String> instances = new ArrayList<>(numReplicas);
+ for (int r = 0; r < numReplicas; r++) {
+ instances.add(allTaggedInstances.get(serverId));
+ serverId = (serverId + 1) % numInstances;
+ }
+ partitionAssignment.addPartition(partition, instances);
+ }
+ return partitionAssignment;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentTest.java
new file mode 100644
index 0000000..aa4c7ec
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/partition/ReplicaGroupBasedStreamPartitionAssignmentTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link ReplicaGroupBasedStreamPartitionAssignmentStrategy}
+ */
+public class ReplicaGroupBasedStreamPartitionAssignmentTest {
+
+ @Test
+ public void testReplicaGroupBasedStreamPartitionAssignmentStrategy() throws
InvalidConfigException {
+ MockReplicaGroupBasedStreamPartitionAssignmentStrategy
mockStreamPartitionAssignmentStrategy =
+ new MockReplicaGroupBasedStreamPartitionAssignmentStrategy();
+
+ // Realtime table with 2 replicas, 6 partitions, 4
numInstancesPerReplicaGroup.
+ // 8 servers distributed across 2 replica groups into 2 sets of 4.
+ String tableNameWithType = "tableName_REALTIME";
+ List<String> partitions = Lists.newArrayList("0", "1", "2", "3", "4", "5");
+ int numReplicas = 2;
+ List<String> allTaggedInstances =
+ Lists.newArrayList("server_1", "server_2", "server_3", "server_4",
"server_5", "server_6", "server_7",
+ "server_8");
+
+ ReplicaGroupPartitionAssignment replicaGroupPartitionAssignment =
+ new ReplicaGroupPartitionAssignment(tableNameWithType);
+ replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 0,
+ Lists.newArrayList("server_1", "server_2", "server_3", "server_4"));
+ replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 1,
+ Lists.newArrayList("server_5", "server_6", "server_7", "server_8"));
+
+ // null replica group partition assignment
+
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(null);
+ boolean exception = false;
+ try {
+ mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType, partitions,
+ numReplicas, allTaggedInstances);
+ } catch (InvalidConfigException e) {
+ exception = true;
+ }
+ Assert.assertTrue(exception);
+
+ // mismatch between numReplicas and numReplicaGroups - follow the replica
group assignment
+
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(replicaGroupPartitionAssignment);
+ PartitionAssignment streamPartitionAssignment =
+
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType, partitions, 5,
+ allTaggedInstances);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("0"),
+ Lists.newArrayList("server_1", "server_5"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("1"),
+ Lists.newArrayList("server_2", "server_6"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("2"),
+ Lists.newArrayList("server_3", "server_7"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("3"),
+ Lists.newArrayList("server_4", "server_8"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("4"),
+ Lists.newArrayList("server_1", "server_5"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("5"),
+ Lists.newArrayList("server_2", "server_6"));
+
+ // happy path - correctly generated partition assignment
+
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(replicaGroupPartitionAssignment);
+ streamPartitionAssignment =
+
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType, partitions,
+ numReplicas, allTaggedInstances);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("0"),
+ Lists.newArrayList("server_1", "server_5"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("1"),
+ Lists.newArrayList("server_2", "server_6"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("2"),
+ Lists.newArrayList("server_3", "server_7"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("3"),
+ Lists.newArrayList("server_4", "server_8"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("4"),
+ Lists.newArrayList("server_1", "server_5"));
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("5"),
+ Lists.newArrayList("server_2", "server_6"));
+
+ // 0 partitions
+ streamPartitionAssignment =
+
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType,
+ Collections.emptyList(), numReplicas, allTaggedInstances);
+ Assert.assertEquals(streamPartitionAssignment.getNumPartitions(), 0);
+
+ // only 1 instance per replica group
+ replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 0,
Lists.newArrayList("server_1"));
+ replicaGroupPartitionAssignment.setInstancesToReplicaGroup(0, 1,
Lists.newArrayList("server_2"));
+
mockStreamPartitionAssignmentStrategy.setReplicaGroupPartitionAssignment(replicaGroupPartitionAssignment);
+ streamPartitionAssignment =
+
mockStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType, partitions,
+ numReplicas, allTaggedInstances);
+ ArrayList<String> verticalSlice = Lists.newArrayList("server_1",
"server_2");
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("0"),
verticalSlice);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("1"),
verticalSlice);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("2"),
verticalSlice);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("3"),
verticalSlice);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("4"),
verticalSlice);
+
Assert.assertEquals(streamPartitionAssignment.getInstancesListForPartition("5"),
verticalSlice);
+ }
+
+ private class MockReplicaGroupBasedStreamPartitionAssignmentStrategy
+ extends ReplicaGroupBasedStreamPartitionAssignmentStrategy {
+ private ReplicaGroupPartitionAssignment _replicaGroupPartitionAssignment;
+
+ @Override
+ protected ReplicaGroupPartitionAssignment
getReplicaGroupPartitionAssignment(HelixManager helixManager,
+ String tableNameWithType) {
+ return _replicaGroupPartitionAssignment;
+ }
+
+ void setReplicaGroupPartitionAssignment(ReplicaGroupPartitionAssignment
replicaGroupPartitionAssignment) {
+ _replicaGroupPartitionAssignment = replicaGroupPartitionAssignment;
+ }
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
index 2ac46cc..4562418 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/partition/StreamPartitionAssignmentGeneratorTest.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TenantConfig;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -64,6 +65,46 @@ public class StreamPartitionAssignmentGeneratorTest {
}
}
+ @Test
+ public void testStreamPartitionAssignmentStrategyFactory() {
+ TableConfig tableConfig = mock(TableConfig.class);
+
+ // null validation config
+ StreamPartitionAssignmentStrategy streamPartitionAssignmentStrategy =
+
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+ Assert.assertTrue(streamPartitionAssignmentStrategy instanceof
UniformStreamPartitionAssignmentStrategy);
+
+ // null segment assignment strategy
+ SegmentsValidationAndRetentionConfig mockValidationConfig =
mock(SegmentsValidationAndRetentionConfig.class);
+ when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+ streamPartitionAssignmentStrategy =
+
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+ Assert.assertTrue(streamPartitionAssignmentStrategy instanceof
UniformStreamPartitionAssignmentStrategy);
+
+ // incorrect segment assignment strategy
+
when(mockValidationConfig.getSegmentAssignmentStrategy()).thenReturn("someThing
Wrong");
+ when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+ streamPartitionAssignmentStrategy =
+
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+ Assert.assertTrue(streamPartitionAssignmentStrategy instanceof
UniformStreamPartitionAssignmentStrategy);
+
+ // Unsupported type
+ when(mockValidationConfig.getSegmentAssignmentStrategy()).thenReturn(
+
CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.BalanceNumSegmentAssignmentStrategy.toString());
+ when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+ streamPartitionAssignmentStrategy =
+
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+ Assert.assertTrue(streamPartitionAssignmentStrategy instanceof
UniformStreamPartitionAssignmentStrategy);
+
+ // ReplicaGroup
+ when(mockValidationConfig.getSegmentAssignmentStrategy()).thenReturn(
+
CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy.toString());
+ when(tableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+ streamPartitionAssignmentStrategy =
+
StreamPartitionAssignmentStrategyFactory.getStreamPartitionAssignmentStrategy(tableConfig);
+ Assert.assertTrue(streamPartitionAssignmentStrategy instanceof
ReplicaGroupBasedStreamPartitionAssignmentStrategy);
+ }
+
/**
* Given an ideal state, constructs the partition assignment for the table
*/
@@ -286,7 +327,7 @@ public class StreamPartitionAssignmentGeneratorTest {
int serverId = 0;
for (int p = 0; p < partitionAssignment.getNumPartitions(); p++) {
for (String instance :
partitionAssignment.getInstancesListForPartition(String.valueOf(p))) {
- Assert.assertTrue(instance.equals(instancesUsed.get(serverId++)),
+ Assert.assertEquals(instancesUsed.get(serverId++), instance,
"Uniform strategy test failed for table " + tableName);
if (serverId == instancesUsed.size()) {
serverId = 0;
@@ -305,7 +346,7 @@ public class StreamPartitionAssignmentGeneratorTest {
private List<String> _consumingInstances;
- public TestStreamPartitionAssignmentGenerator(HelixManager helixManager) {
+ TestStreamPartitionAssignmentGenerator(HelixManager helixManager) {
super(helixManager);
_consumingInstances = new ArrayList<>();
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentTest.java
new file mode 100644
index 0000000..5bb3da6
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/partition/UniformStreamPartitionAssignmentTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link UniformStreamPartitionAssignmentStrategy}
+ */
+public class UniformStreamPartitionAssignmentTest {
+
+ /**
+ * Tests that we validate necessary variables and honor the uniform sticky
assignment
+ * @throws InvalidConfigException
+ */
+ @Test
+ public void testUniformStreamPartitionAssignmentStrategy() throws
InvalidConfigException {
+ UniformStreamPartitionAssignmentStrategy
uniformStreamPartitionAssignmentStrategy =
+ new UniformStreamPartitionAssignmentStrategy();
+
+ String tableNameWithType = "tableName_REALTIME";
+ List<String> partitions = Lists.newArrayList("0", "1", "2", "3", "4", "5");
+ int numReplicas = 3;
+ List<String> allTaggedInstances =
+ Lists.newArrayList("server_1", "server_2", "server_3", "server_4",
"server_5", "server_6", "server_7",
+ "server_8");
+
+ // num replicas more than tagged instances
+ boolean exception = false;
+ try {
+
uniformStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType, partitions, 10,
+ allTaggedInstances);
+ } catch (InvalidConfigException e) {
+ exception = true;
+ }
+ Assert.assertTrue(exception);
+
+ // 0 partitions
+ PartitionAssignment uniformPartitionAssignment =
+
uniformStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType,
+ Collections.emptyList(), numReplicas, allTaggedInstances);
+ Assert.assertEquals(uniformPartitionAssignment.getNumPartitions(), 0);
+
+ // verify sticky uniform assignment
+ uniformPartitionAssignment =
+
uniformStreamPartitionAssignmentStrategy.getStreamPartitionAssignment(null,
tableNameWithType, partitions,
+ numReplicas, allTaggedInstances);
+
+ List<String> instancesUsed = new ArrayList<>();
+ for (int p = 0; p < uniformPartitionAssignment.getNumPartitions(); p++) {
+ for (String instance :
uniformPartitionAssignment.getInstancesListForPartition(String.valueOf(p))) {
+ if (!instancesUsed.contains(instance)) {
+ instancesUsed.add(instance);
+ }
+ }
+ }
+ Assert.assertTrue(instancesUsed.containsAll(allTaggedInstances));
+
+ int serverIndex = 0;
+ for (int p = 0; p < uniformPartitionAssignment.getNumPartitions(); p++) {
+ List<String> instancesListForPartition =
+
uniformPartitionAssignment.getInstancesListForPartition(String.valueOf(p));
+ for (String instance : instancesListForPartition) {
+ Assert.assertEquals(instance, instancesUsed.get(serverIndex++));
+ if (serverIndex == instancesUsed.size()) {
+ serverIndex = 0;
+ }
+ }
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
index 9b7d8e3..a0f1985 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
@@ -392,8 +392,8 @@ public class ReplicaGroupRebalanceSegmentStrategy
implements RebalanceSegmentStr
}
// Update Idealstate with rebalanced segment assignment
- Map<String, Map<String, String>> serverToSegmentsMapping =
buildSegmentToServerMapping(serverToSegments);
- for (Map.Entry<String, Map<String, String>> entry :
serverToSegmentsMapping.entrySet()) {
+ Map<String, Map<String, String>> newSegmentToServersMap =
buildSegmentToServerMapping(serverToSegments);
+ for (Map.Entry<String, Map<String, String>> entry :
newSegmentToServersMap.entrySet()) {
idealState.setInstanceStateMap(entry.getKey(), entry.getValue());
}
idealState.setReplicas(Integer.toString(numReplicaGroups));
@@ -519,11 +519,8 @@ public class ReplicaGroupRebalanceSegmentStrategy
implements RebalanceSegmentStr
for (Map.Entry<String, LinkedList<String>> entry :
serverToSegments.entrySet()) {
String server = entry.getKey();
for (String segment : entry.getValue()) {
- if (!segmentsToServerMapping.containsKey(segment)) {
- segmentsToServerMapping.put(segment, new HashMap<String, String>());
- }
- segmentsToServerMapping.get(segment)
- .put(server,
CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE);
+ Map<String, String> serverToStateMap =
segmentsToServerMapping.computeIfAbsent(segment, k -> new HashMap<>());
+ serverToStateMap.put(server,
CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE);
}
}
return segmentsToServerMapping;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]