Repository: samza Updated Branches: refs/heads/master 944c70878 -> 71004e1eb
SAMZA-1176; Make TestJoinOperator unit tests safe for concurrent execution Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #105 from prateekm/join-tests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/71004e1e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/71004e1e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/71004e1e Branch: refs/heads/master Commit: 71004e1eb07bd57dfd5f5e72148476cbf646f3c3 Parents: 944c708 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Fri Mar 31 14:00:30 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Fri Mar 31 14:00:30 2017 -0700 ---------------------------------------------------------------------- .../samza/operators/TestJoinOperator.java | 117 ++++++++++++------- 1 file changed, 76 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/71004e1e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 07c6a97..4e6c750 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -26,13 +26,14 @@ import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; -import org.junit.Before; import org.junit.Test; import java.time.Duration; @@ -46,54 +47,47 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestJoinOperator { - private final MessageCollector messageCollector = mock(MessageCollector.class); private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); - private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - - private StreamOperatorTask sot; - private List<Integer> output = new ArrayList<>(); private final ApplicationRunner runner = mock(ApplicationRunner.class); - - - @Before - public void setup() throws Exception { - output.clear(); - - TaskContext taskContext = mock(TaskContext.class); - when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet - .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), - new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); - Config config = mock(Config.class); - - StreamApplication sgb = new TestStreamApplication(); - sot = new StreamOperatorTask(sgb, runner); - sot.init(config, taskContext); - } + private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @Test - public void join() { + public void join() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to second stream with same keys numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); + int outputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(outputSum, 110); + assertEquals(110, outputSum); } @Test - public void joinReverse() { + public void joinReverse() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to second stream numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to first stream with same keys numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); int outputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(outputSum, 110); + assertEquals(110, outputSum); } @Test - public void joinNoMatch() { + public void joinNoMatch() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to second stream with different keys @@ -103,7 +97,11 @@ public class TestJoinOperator { } @Test - public void joinNoMatchReverse() { + public void joinNoMatchReverse() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to second stream numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to first stream with different keys @@ -113,7 +111,11 @@ public class TestJoinOperator { } @Test - public void joinRetainsLatestMessageForKey() { + public void joinRetainsLatestMessageForKey() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to first stream again with same keys but different values @@ -122,11 +124,15 @@ public class TestJoinOperator { numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); int outputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(outputSum, 165); // should use latest messages in the first stream + assertEquals(165, outputSum); // should use latest messages in the first stream } @Test - public void joinRetainsLatestMessageForKeyReverse() { + public void joinRetainsLatestMessageForKeyReverse() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to second stream numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to second stream again with same keys but different values @@ -135,47 +141,59 @@ public class TestJoinOperator { numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); int outputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(outputSum, 165); // should use latest messages in the second stream + assertEquals(165, outputSum); // should use latest messages in the second stream } @Test - public void joinRetainsMatchedMessages() { + public void joinRetainsMatchedMessages() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to second stream with same key numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); int outputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(outputSum, 110); + assertEquals(110, outputSum); output.clear(); // push messages to first stream with same keys once again. numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); int newOutputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(newOutputSum, 110); // should produce the same output as before + assertEquals(110, newOutputSum); // should produce the same output as before } @Test - public void joinRetainsMatchedMessagesReverse() { + public void joinRetainsMatchedMessagesReverse() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); // push messages to second stream with same key numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); int outputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(outputSum, 110); + assertEquals(110, outputSum); output.clear(); // push messages to second stream with same keys once again. numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); int newOutputSum = output.stream().reduce(0, (s, m) -> s + m); - assertEquals(newOutputSum, 110); // should produce the same output as before + assertEquals(110, newOutputSum); // should produce the same output as before } @Test public void joinRemovesExpiredMessages() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); @@ -191,6 +209,10 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessagesReverse() throws Exception { + StreamOperatorTask sot = createStreamOperatorTask(); + List<Integer> output = new ArrayList<>(); + MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); + // push messages to second stream numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); @@ -203,6 +225,19 @@ public class TestJoinOperator { assertTrue(output.isEmpty()); } + private StreamOperatorTask createStreamOperatorTask() throws Exception { + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), + new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); + Config config = mock(Config.class); + + StreamApplication sgb = new TestStreamApplication(); + StreamOperatorTask sot = new StreamOperatorTask(sgb, runner); + sot.init(config, taskContext); + return sot; + } + private class TestStreamApplication implements StreamApplication { StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem"); StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2"); @@ -212,11 +247,11 @@ public class TestJoinOperator { MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null); MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null); + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); inStream .join(inStream2, new TestJoinFunction(), Duration.ofMillis(10)) - .map(m -> { - output.add(m); - return m; + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); } }