Repository: storm
Updated Branches:
  refs/heads/master 24ab227ed -> 82f7450cc


STORM-2782 - refactor partial key grouping


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d884f8a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d884f8a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d884f8a3

Branch: refs/heads/master
Commit: d884f8a3dda9f9d81064280e61eb9e56d4542e87
Parents: 7b940ae
Author: Kevin Peek <[email protected]>
Authored: Mon Oct 30 11:22:20 2017 -0400
Committer: Kevin Peek <[email protected]>
Committed: Mon Oct 30 11:22:20 2017 -0400

----------------------------------------------------------------------
 .../storm/grouping/PartialKeyGrouping.java      | 199 ++++++++++++++-----
 .../storm/grouping/PartialKeyGroupingTest.java  |  66 ------
 .../BalancedTargetSelectorTest.java             |  64 ++++++
 .../PartialKeyGroupingTest.java                 |  89 +++++++++
 .../RandomTwoTaskAssignmentCreatorTest.java     |  62 ++++++
 5 files changed, 366 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d884f8a3/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java 
b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
index e1af16d..70dfeaa 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
@@ -17,40 +17,64 @@
  */
 package org.apache.storm.grouping;
 
+import com.google.common.collect.Maps;
+
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.Fields;
 
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
 
+/**
+ * A variation on FieldGrouping. This grouping operates on a partitioning of 
the incoming
+ * tuples (like a FieldGrouping), but it can send Tuples from a given 
partition to
+ * multiple downstream tasks.
+ *
+ * Given a total pool of target tasks, this grouping will always send Tuples 
with a given
+ * key to one member of a subset of those tasks. Each key is assigned a subset 
of tasks.
+ * Each tuple is then sent to one task from that subset.
+ *
+ * Notes:
+ * - the default TaskSelector ensures each task gets as close to a balanced 
number of Tuples as possible
+ * - the default AssignmentCreator hashes the key and produces an assignment 
of two tasks
+ */
 public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
-    private static final long serialVersionUID = -447379837314000353L;
+    private static final long serialVersionUID = -1672360572274911808L;
     private List<Integer> targetTasks;
-    private long[] targetTaskStats;
-    private HashFunction h1 = Hashing.murmur3_128(13);
-    private HashFunction h2 = Hashing.murmur3_128(17);
     private Fields fields = null;
     private Fields outFields = null;
 
+    private AssignmentCreator assignmentCreator;
+    private TargetSelector targetSelector;
+
     public PartialKeyGrouping() {
-        //Empty
+        this(null);
     }
 
     public PartialKeyGrouping(Fields fields) {
+        this(fields, new RandomTwoTaskAssignmentCreator(), new 
BalancedTargetSelector());
+    }
+
+    public PartialKeyGrouping(Fields fields, AssignmentCreator 
assignmentCreator) {
+        this(fields, assignmentCreator, new BalancedTargetSelector());
+    }
+    
+    public PartialKeyGrouping(Fields fields, AssignmentCreator 
assignmentCreator, TargetSelector targetSelector) {
         this.fields = fields;
+        this.assignmentCreator = assignmentCreator;
+        this.targetSelector = targetSelector;
     }
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, 
List<Integer> targetTasks) {
         this.targetTasks = targetTasks;
-        targetTaskStats = new long[this.targetTasks.size()];
         if (this.fields != null) {
             this.outFields = context.getComponentOutputFields(stream);
         }
@@ -60,47 +84,126 @@ public class PartialKeyGrouping implements 
CustomStreamGrouping, Serializable {
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
         List<Integer> boltIds = new ArrayList<>(1);
         if (values.size() > 0) {
-            byte[] raw;
-            if (fields != null) {
-                List<Object> selectedFields = outFields.select(fields, values);
-                ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 
4);
-                for (Object o: selectedFields) {
-                    if (o instanceof List) {
-                        out.putInt(Arrays.deepHashCode(((List)o).toArray()));
-                    } else if (o instanceof Object[]) {
-                        out.putInt(Arrays.deepHashCode((Object[])o));
-                    } else if (o instanceof byte[]) {
-                        out.putInt(Arrays.hashCode((byte[]) o));
-                    } else if (o instanceof short[]) {
-                        out.putInt(Arrays.hashCode((short[]) o));
-                    } else if (o instanceof int[]) {
-                        out.putInt(Arrays.hashCode((int[]) o));
-                    } else if (o instanceof long[]) {
-                        out.putInt(Arrays.hashCode((long[]) o));
-                    } else if (o instanceof char[]) {
-                        out.putInt(Arrays.hashCode((char[]) o));
-                    } else if (o instanceof float[]) {
-                        out.putInt(Arrays.hashCode((float[]) o));
-                    } else if (o instanceof double[]) {
-                        out.putInt(Arrays.hashCode((double[]) o));
-                    } else if (o instanceof boolean[]) {
-                        out.putInt(Arrays.hashCode((boolean[]) o));
-                    } else if (o != null) {
-                        out.putInt(o.hashCode());
-                    } else {
-                      out.putInt(0);
-                    }
+            final byte[] rawKeyBytes = getKeyBytes(values);
+
+            final int[] taskAssignmentForKey = 
assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
+            final int selectedTask = 
targetSelector.chooseTask(taskAssignmentForKey);
+
+            boltIds.add(selectedTask);
+        }
+        return boltIds;
+    }
+
+
+    /**
+     * Extract the key from the input Tuple.
+     */
+    private byte[] getKeyBytes(List<Object> values) {
+        byte[] raw;
+        if (fields != null) {
+            List<Object> selectedFields = outFields.select(fields, values);
+            ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
+            for (Object o: selectedFields) {
+                if (o instanceof List) {
+                    out.putInt(Arrays.deepHashCode(((List)o).toArray()));
+                } else if (o instanceof Object[]) {
+                    out.putInt(Arrays.deepHashCode((Object[])o));
+                } else if (o instanceof byte[]) {
+                    out.putInt(Arrays.hashCode((byte[]) o));
+                } else if (o instanceof short[]) {
+                    out.putInt(Arrays.hashCode((short[]) o));
+                } else if (o instanceof int[]) {
+                    out.putInt(Arrays.hashCode((int[]) o));
+                } else if (o instanceof long[]) {
+                    out.putInt(Arrays.hashCode((long[]) o));
+                } else if (o instanceof char[]) {
+                    out.putInt(Arrays.hashCode((char[]) o));
+                } else if (o instanceof float[]) {
+                    out.putInt(Arrays.hashCode((float[]) o));
+                } else if (o instanceof double[]) {
+                    out.putInt(Arrays.hashCode((double[]) o));
+                } else if (o instanceof boolean[]) {
+                    out.putInt(Arrays.hashCode((boolean[]) o));
+                } else if (o != null) {
+                    out.putInt(o.hashCode());
+                } else {
+                    out.putInt(0);
                 }
-                raw = out.array();
-            } else {
-                raw = values.get(0).toString().getBytes(); // assume key is 
the first field
             }
-            int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % 
this.targetTasks.size());
-            int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % 
this.targetTasks.size());
-            int selected = targetTaskStats[firstChoice] > 
targetTaskStats[secondChoice] ? secondChoice : firstChoice;
-            boltIds.add(targetTasks.get(selected));
-            targetTaskStats[selected]++;
+            raw = out.array();
+        } else {
+            raw = values.get(0).toString().getBytes(); // assume key is the 
first field
+        }
+        return raw;
+    }
+
+    /*==================================================
+     * Helper Classes
+     *==================================================*/
+
+    /**
+     * This interface is responsible for choosing a subset of the target tasks 
to use for a given key.
+     *
+     * NOTE: whatever scheme you use to create the assignment should be 
deterministic. This may be executed on multiple
+     * Storm Workers, thus each of them needs to come up with the same 
assignment for a given key.
+     */
+    public interface AssignmentCreator extends Serializable {
+        int[] createAssignment(List<Integer> targetTasks, byte[] key);
+    }
+
+    /**
+     * This interface chooses one element from a task assignment to send a 
specific Tuple to.
+     */
+    public interface TargetSelector extends Serializable {
+        Integer chooseTask(int[] assignedTasks);
+    }
+
+    /*========== Implementations ==========*/
+
+    /**
+     * This implementation of AssignmentCreator chooses two arbitrary tasks.
+     */
+    public static class RandomTwoTaskAssignmentCreator implements 
AssignmentCreator {
+        /**
+         * Creates a two task assignment by selecting random tasks.
+         */
+        public int[] createAssignment(List<Integer> tasks, byte[] key) {
+            // It is necessary that this produce a deterministic assignment 
based on the key, so seed the Random from the key
+            final long seedForRandom = Arrays.hashCode(key);
+            final Random random = new Random(seedForRandom);
+            final int choice1 = random.nextInt(tasks.size());
+            int choice2 = random.nextInt(tasks.size());
+            // ensure that choice1 and choice2 are not the same task
+            choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : 
choice2;
+            return new int[] {tasks.get(choice1), tasks.get(choice2)};
+        }
+    }
+
+    /**
+     * A basic implementation of target selection. This strategy chooses the 
task within the assignment that has
+     * received the fewest Tuples overall from this instance of the grouping.
+     */
+    public static class BalancedTargetSelector implements TargetSelector {
+        private Map<Integer, Long> targetTaskStats = Maps.newHashMap();
+
+        /**
+         * Chooses one of the incoming tasks and selects the one that has been 
selected
+         * the fewest times so far.
+         */
+        public Integer chooseTask(int[] assignedTasks) {
+            Integer taskIdWithMinLoad = null;
+            Long minTaskLoad = Long.MAX_VALUE;
+
+            for (Integer currentTaskId : assignedTasks) {
+                final Long currentTaskLoad = 
targetTaskStats.getOrDefault(currentTaskId, 0L);
+                if (currentTaskLoad < minTaskLoad) {
+                    minTaskLoad = currentTaskLoad;
+                    taskIdWithMinLoad = currentTaskId;
+                }
+            }
+
+            targetTaskStats.put(taskIdWithMinLoad, 
targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
+            return taskIdWithMinLoad;
         }
-        return boltIds;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d884f8a3/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java 
b/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java
deleted file mode 100644
index 0945062..0000000
--- 
a/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.grouping;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.util.List;
-
-import org.junit.Test;
-
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.task.WorkerTopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import com.google.common.collect.Lists;
-
-public class PartialKeyGroupingTest {
-    @Test
-    public void testChooseTasks() {
-        PartialKeyGrouping pkg = new PartialKeyGrouping();
-        pkg.prepare(null, null, Lists.newArrayList(0, 1, 2, 3, 4, 5));
-        Values message = new Values("key1");
-        List<Integer> choice1 = pkg.chooseTasks(0, message);
-        assertThat(choice1.size(), is(1));
-        List<Integer> choice2 = pkg.chooseTasks(0, message);
-        assertThat(choice2, is(not(choice1)));
-        List<Integer> choice3 = pkg.chooseTasks(0, message);
-        assertThat(choice3, is(not(choice2)));
-        assertThat(choice3, is(choice1));
-    }
-
-    @Test
-    public void testChooseTasksFields() {
-        PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test"));
-        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-        
when(context.getComponentOutputFields(any(GlobalStreamId.class))).thenReturn(new
 Fields("test"));
-        pkg.prepare(context, mock(GlobalStreamId.class), Lists.newArrayList(0, 
1, 2, 3, 4, 5));
-        Values message = new Values("key1");
-        List<Integer> choice1 = pkg.chooseTasks(0, message);
-        assertThat(choice1.size(), is(1));
-        List<Integer> choice2 = pkg.chooseTasks(0, message);
-        assertThat(choice2, is(not(choice1)));
-        List<Integer> choice3 = pkg.chooseTasks(0, message);
-        assertThat(choice3, is(not(choice2)));
-        assertThat(choice3, is(choice1));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d884f8a3/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java
 
b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java
new file mode 100644
index 0000000..2b880d5
--- /dev/null
+++ 
b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping.partialKeyGrouping;
+
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.utils.Utils;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class BalancedTargetSelectorTest {
+
+
+    private static final int[] TASK_LIST = {9, 8, 7, 6};
+
+    private final PartialKeyGrouping.TargetSelector targetSelector = new 
PartialKeyGrouping.BalancedTargetSelector();
+
+    @Test
+    public void classIsSerializable() throws Exception {
+        Utils.javaSerialize(targetSelector);
+    }
+
+    @Test
+    public void selectorReturnsTasksInAssignment() {
+        // select tasks once more than the number of tasks available
+        for (int i = 0; i < TASK_LIST.length + 1; i++) {
+            int selectedTask = targetSelector.chooseTask(TASK_LIST);
+            assertThat(selectedTask, 
Matchers.in(Arrays.stream(TASK_LIST).boxed().collect(Collectors.toList())));
+        }
+    }
+
+    @Test
+    public void selectsTaskThatHasBeenUsedTheLeast() {
+        // ensure that the first three tasks have been selected before
+        targetSelector.chooseTask(new int[] {TASK_LIST[0]});
+        targetSelector.chooseTask(new int[] {TASK_LIST[1]});
+        targetSelector.chooseTask(new int[] {TASK_LIST[2]});
+
+        // now, selecting from the full set should cause the fourth task to be 
chosen.
+        int selectedTask = targetSelector.chooseTask(TASK_LIST);
+        assertThat(selectedTask, equalTo(TASK_LIST[3]));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d884f8a3/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java
 
b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java
new file mode 100644
index 0000000..405b0a4
--- /dev/null
+++ 
b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping.partialKeyGrouping;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PartialKeyGroupingTest {
+
+    @Test
+    public void testGroupingIsSerializable() throws Exception {
+        PartialKeyGrouping grouping = new PartialKeyGrouping(new 
Fields("some_field"));
+        Utils.javaSerialize(grouping);
+    }
+
+    @Test
+    public void testChooseTasks() {
+        PartialKeyGrouping pkg = new PartialKeyGrouping();
+        pkg.prepare(null, null, Lists.newArrayList(0, 1, 2, 3, 4, 5));
+        Values message = new Values("key1");
+        List<Integer> choice1 = pkg.chooseTasks(0, message);
+        assertThat(choice1.size(), is(1));
+        List<Integer> choice2 = pkg.chooseTasks(0, message);
+        assertThat(choice2, is(not(choice1)));
+        List<Integer> choice3 = pkg.chooseTasks(0, message);
+        assertThat(choice3, is(not(choice2)));
+        assertThat(choice3, is(choice1));
+    }
+
+    @Test
+    public void testChooseTasksWithoutConsecutiveTaskIds() {
+        PartialKeyGrouping pkg = new PartialKeyGrouping();
+        pkg.prepare(null, null, Lists.newArrayList(9, 8, 7, 1, 2, 3));
+        Values message = new Values("key1");
+        List<Integer> choice1 = pkg.chooseTasks(0, message);
+        assertThat(choice1.size(), is(1));
+        List<Integer> choice2 = pkg.chooseTasks(0, message);
+        assertThat(choice2, is(not(choice1)));
+        List<Integer> choice3 = pkg.chooseTasks(0, message);
+        assertThat(choice3, is(not(choice2)));
+        assertThat(choice3, is(choice1));
+    }
+
+    @Test
+    public void testChooseTasksFields() {
+        PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test"));
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        
when(context.getComponentOutputFields(any(GlobalStreamId.class))).thenReturn(new
 Fields("test"));
+        pkg.prepare(context, mock(GlobalStreamId.class), Lists.newArrayList(0, 
1, 2, 3, 4, 5));
+        Values message = new Values("key1");
+        List<Integer> choice1 = pkg.chooseTasks(0, message);
+        assertThat(choice1.size(), is(1));
+        List<Integer> choice2 = pkg.chooseTasks(0, message);
+        assertThat(choice2, is(not(choice1)));
+        List<Integer> choice3 = pkg.chooseTasks(0, message);
+        assertThat(choice3, is(not(choice2)));
+        assertThat(choice3, is(choice1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d884f8a3/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java
 
b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java
new file mode 100644
index 0000000..332fed5
--- /dev/null
+++ 
b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping.partialKeyGrouping;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+public class RandomTwoTaskAssignmentCreatorTest {
+
+    private static final byte[] GROUPING_KEY_ONE = "some_key_one".getBytes();
+    private static final byte[] GROUPING_KEY_TWO = "some_key_two".getBytes();
+
+    @Test
+    public void classIsSerializable() throws Exception {
+        PartialKeyGrouping.AssignmentCreator assignmentCreator = new 
PartialKeyGrouping.RandomTwoTaskAssignmentCreator();
+        Utils.javaSerialize(assignmentCreator);
+    }
+
+    @Test
+    public void returnsAssignmentOfExpectedSize() {
+        PartialKeyGrouping.AssignmentCreator assignmentCreator = new 
PartialKeyGrouping.RandomTwoTaskAssignmentCreator();
+        int[] assignedTasks = 
assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), 
GROUPING_KEY_ONE);
+        assertThat(assignedTasks.length, equalTo(2));
+    }
+
+    @Test
+    public void returnsDifferentAssignmentForDifferentKeys() {
+        PartialKeyGrouping.AssignmentCreator assignmentCreator = new 
PartialKeyGrouping.RandomTwoTaskAssignmentCreator();
+        int[] assignmentOne = 
assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), 
GROUPING_KEY_ONE);
+        int[] assignmentTwo = 
assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), 
GROUPING_KEY_TWO);
+        assertThat(assignmentOne, not(equalTo(assignmentTwo)));
+    }
+
+    @Test
+    public void returnsSameAssignmentForSameKey() {
+        PartialKeyGrouping.AssignmentCreator assignmentCreator = new 
PartialKeyGrouping.RandomTwoTaskAssignmentCreator();
+        int[] assignmentOne = 
assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), 
GROUPING_KEY_ONE);
+        int[] assignmentOneAgain = 
assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), 
GROUPING_KEY_ONE);
+        assertThat(assignmentOne, equalTo(assignmentOneAgain));
+    }
+}

Reply via email to