Repository: samza Updated Branches: refs/heads/master 711dd8dc3 -> 1296c7ff9
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 1426444..5b3c3a0 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -198,12 +198,12 @@ public class TestMessageStreamImpl { public void testRepartition() { StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); - - String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0); + String mockOpName = "mockName"; + when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class); KVSerde mockKVSerde = mock(KVSerde.class); IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class); - when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde))) + when(mockGraph.getIntermediateStream(eq(mockOpName), eq(mockKVSerde))) .thenReturn(mockIntermediateStream); when(mockIntermediateStream.getOutputStream()) .thenReturn(mockOutputStreamImpl); @@ -211,7 +211,7 @@ public class TestMessageStreamImpl { MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); Function mockKeyFunction = mock(Function.class); Function mockValueFunction = mock(Function.class); - inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde); + inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde, "p1"); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); @@ -228,11 +228,11 @@ public class TestMessageStreamImpl { public void testRepartitionWithoutSerde() { StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); - - String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0); + String mockOpName = "mockName"; + when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class); IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class); - when(mockGraph.getIntermediateStream(eq(streamName), eq(null))) + when(mockGraph.getIntermediateStream(eq(mockOpName), eq(null))) .thenReturn(mockIntermediateStream); when(mockIntermediateStream.getOutputStream()) .thenReturn(mockOutputStreamImpl); @@ -240,7 +240,7 @@ public class TestMessageStreamImpl { MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); Function mockKeyFunction = mock(Function.class); Function mockValueFunction = mock(Function.class); - inputStream.partitionBy(mockKeyFunction, mockValueFunction); + inputStream.partitionBy(mockKeyFunction, mockValueFunction, "p1"); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); @@ -264,9 +264,10 @@ public class TestMessageStreamImpl { Supplier<Integer> initialValue = () -> 0; // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M) - Window<TestInputMessageEnvelope, String, Integer> window = Windows - .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator, null, mock(Serde.class)); - MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window); + Window<TestInputMessageEnvelope, String, Integer> window = + Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator, + null, mock(Serde.class)); + MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window, "w1"); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); @@ -289,7 +290,8 @@ public class TestMessageStreamImpl { mock(JoinFunction.class); Duration joinTtl = Duration.ofMinutes(1); - source1.join(source2, mockJoinFn, mock(Serde.class), mock(Serde.class), mock(Serde.class), joinTtl); + source1.join(source2, mockJoinFn, + mock(Serde.class), mock(Serde.class), mock(Serde.class), joinTtl, "j1"); ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture()); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java index 45583c2..e0152a0 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators; import junit.framework.Assert; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.data.TestMessageEnvelope; @@ -38,6 +39,8 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -363,15 +366,14 @@ public class TestStreamGraphImpl { ApplicationRunner mockRunner = mock(ApplicationRunner.class); Config mockConfig = mock(Config.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); - when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); - when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + String mockStreamName = "mockStreamName"; + when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); Serde mockValueSerde = mock(Serde.class); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream("test-stream-1", mockValueSerde); + graph.getIntermediateStream(mockStreamName, mockValueSerde); assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); @@ -387,9 +389,8 @@ public class TestStreamGraphImpl { ApplicationRunner mockRunner = mock(ApplicationRunner.class); Config mockConfig = mock(Config.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); - when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); - when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + String mockStreamName = "mockStreamName"; + when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); @@ -399,7 +400,7 @@ public class TestStreamGraphImpl { doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream("test-stream-1", mockKVSerde); + graph.getIntermediateStream(mockStreamName, mockKVSerde); assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); @@ -415,16 +416,15 @@ public class TestStreamGraphImpl { ApplicationRunner mockRunner = mock(ApplicationRunner.class); Config mockConfig = mock(Config.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); - when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); - when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + String mockStreamName = "mockStreamName"; + when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); Serde mockValueSerde = mock(Serde.class); graph.setDefaultSerde(mockValueSerde); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream("test-stream-1", null); + graph.getIntermediateStream(mockStreamName, null); assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); @@ -440,9 +440,8 @@ public class TestStreamGraphImpl { ApplicationRunner mockRunner = mock(ApplicationRunner.class); Config mockConfig = mock(Config.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); - when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); - when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + String mockStreamName = "mockStreamName"; + when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); @@ -453,7 +452,7 @@ public class TestStreamGraphImpl { doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); graph.setDefaultSerde(mockKVSerde); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream("test-stream-1", null); + graph.getIntermediateStream(mockStreamName, null); assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); @@ -469,13 +468,12 @@ public class TestStreamGraphImpl { ApplicationRunner mockRunner = mock(ApplicationRunner.class); Config mockConfig = mock(Config.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); - when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); - when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + String mockStreamName = "mockStreamName"; + when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream("test-stream-1", null); + graph.getIntermediateStream(mockStreamName, null); assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); @@ -499,9 +497,26 @@ public class TestStreamGraphImpl { @Test public void testGetNextOpIdIncrementsId() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - assertEquals(graph.getNextOpId(), 0); - assertEquals(graph.getNextOpId(), 1); + Config mockConfig = mock(Config.class); + when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); + when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, null)); + assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName")); + assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null)); + } + + @Test(expected = SamzaException.class) + public void testGetNextOpIdRejectsDuplicates() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); + when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName")); + graph.getNextOpId(OpCode.JOIN, "customName"); // should throw } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index aee457e..2140af1 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -26,6 +26,7 @@ import junit.framework.Assert; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.impl.store.TestInMemoryStore; @@ -58,6 +59,8 @@ import java.util.Collection; import java.util.List; import java.util.function.Function; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -71,6 +74,8 @@ public class TestWindowOperator { @Before public void setup() throws Exception { config = mock(Config.class); + when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); taskContext = mock(TaskContextImpl.class); runner = mock(ApplicationRunner.class); Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde()); @@ -79,7 +84,8 @@ public class TestWindowOperator { when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(taskContext.getStore("window-3")).thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); + when(taskContext.getStore("jobName-jobId-window-w1")) + .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka")); } @@ -93,7 +99,8 @@ public class TestWindowOperator { TestClock testClock = new TestClock(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); testClock.advanceTime(Duration.ofSeconds(1)); @@ -126,7 +133,8 @@ public class TestWindowOperator { StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); Assert.assertEquals(windowPanes.size(), 0); integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); @@ -150,7 +158,8 @@ public class TestWindowOperator { StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -176,7 +185,8 @@ public class TestWindowOperator { List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); @@ -222,7 +232,8 @@ public class TestWindowOperator { StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); task.init(config, taskContext); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); @@ -254,7 +265,8 @@ public class TestWindowOperator { task.init(config, taskContext); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); Assert.assertEquals(windowPanes.size(), 1); @@ -297,7 +309,8 @@ public class TestWindowOperator { task.init(config, taskContext); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); //assert that the count trigger fired @@ -351,7 +364,8 @@ public class TestWindowOperator { Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); TestClock testClock = new TestClock(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); @@ -403,9 +417,10 @@ public class TestWindowOperator { .map(kv -> new IntegerEnvelope(kv.getKey())); Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream - .map(m -> m) - .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode)) + .map(m -> m) + .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) + .setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); @@ -434,8 +449,9 @@ public class TestWindowOperator { Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) - .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode)) + .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde()) + .setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); @@ -463,7 +479,7 @@ public class TestWindowOperator { inStream .map(m -> m) .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) - .setAccumulationMode(mode)) + .setAccumulationMode(mode), "w1") .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 4a78da8..904367b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -209,7 +209,7 @@ public class TestOperatorImpl { private static class TestOpSpec extends OperatorSpec<Object, Object> { TestOpSpec() { - super(OpCode.INPUT, 1); + super(OpCode.INPUT, "1"); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 1c14fb4..47e55a8 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -71,6 +71,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -129,15 +130,19 @@ public class TestOperatorImplGraph { ApplicationRunner mockRunner = mock(ApplicationRunner.class); when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system")); - when(mockRunner.getStreamSpec(eq("null-null-partition_by-1"))) + when(mockRunner.getStreamSpec(eq("jobName-jobId-partition_by-p1"))) .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system")); - StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + Config mockConfig = mock(Config.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig); MessageStream<Object> inputStream = streamGraph.getInputStream("input"); OutputStream<KV<Integer, String>> outputStream = streamGraph .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); inputStream - .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))) + .partitionBy(Object::hashCode, Object::toString, + KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), "p1") .sendTo(outputStream); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); @@ -147,7 +152,7 @@ public class TestOperatorImplGraph { when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP); when(mockTaskContext.getJobModel()).thenReturn(jobModel); OperatorImplGraph opImplGraph = - new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class)); InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); assertEquals(1, inputOpImpl.registeredOperators.size()); @@ -215,22 +220,25 @@ public class TestOperatorImplGraph { ApplicationRunner mockRunner = mock(ApplicationRunner.class); when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system")); when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system")); - StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + Config mockConfig = mock(Config.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig); JoinFunction mockJoinFunction = mock(JoinFunction.class); MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>()); MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>()); inputStream1.join(inputStream2, mockJoinFunction, - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)); + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1"); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); KeyValueStore mockLeftStore = mock(KeyValueStore.class); - when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(mockLeftStore); + when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore); KeyValueStore mockRightStore = mock(KeyValueStore.class); - when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(mockRightStore); + when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore); OperatorImplGraph opImplGraph = - new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class)); // verify that join function is initialized once. verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class)); @@ -388,29 +396,30 @@ public class TestOperatorImplGraph { when(runner.getStreamSpec("output2")).thenReturn(output2); // intermediate streams used in tests - StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", "test-app-1-partition_by-10", "default-system"); - StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", "test-app-1-partition_by-6", "default-system"); - when(runner.getStreamSpec("test-app-1-partition_by-10")) - .thenReturn(int1); - when(runner.getStreamSpec("test-app-1-partition_by-6")) - .thenReturn(int2); + StreamSpec int1 = new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"); + StreamSpec int2 = new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"); + when(runner.getStreamSpec("test-app-1-partition_by-p2")).thenReturn(int1); + when(runner.getStreamSpec("test-app-1-partition_by-p1")).thenReturn(int2); StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m -> m); MessageStream messageStream2 = streamGraph.getInputStream("input2").filter(m -> true); MessageStream messageStream3 = - streamGraph.getInputStream("input3").filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m); + streamGraph.getInputStream("input3") + .filter(m -> true) + .partitionBy(m -> "hehe", m -> m, "p1") + .map(m -> m); OutputStream<Object> outputStream1 = streamGraph.getOutputStream("output1"); OutputStream<Object> outputStream2 = streamGraph.getOutputStream("output2"); messageStream1 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2)) - .partitionBy(m -> "haha", m -> m) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") + .partitionBy(m -> "haha", m -> m, "p2") .sendTo(outputStream1); messageStream3 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") .sendTo(outputStream2); Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java index f1fb8e2..65f1dc6 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -49,7 +49,7 @@ public class TestWindowOperatorSpec { window.setEarlyTrigger(earlyTrigger); window.setLateTrigger(lateTrigger); - WindowOperatorSpec spec = new WindowOperatorSpec(window, 0); + WindowOperatorSpec spec = new WindowOperatorSpec(window, "0"); Assert.assertEquals(spec.getDefaultTriggerMs(), 5); } @@ -62,7 +62,7 @@ public class TestWindowOperatorSpec { null, WindowType.SESSION, null, null, mock(Serde.class)); window.setEarlyTrigger(earlyTrigger); - WindowOperatorSpec spec = new WindowOperatorSpec(window, 0); + WindowOperatorSpec spec = new WindowOperatorSpec(window, "0"); Assert.assertEquals(spec.getDefaultTriggerMs(), 150); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index d2f0184..29c509d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -93,7 +93,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { final StreamApplication app = (streamGraph, cfg) -> { streamGraph.<KV<String, PageView>>getInputStream("PageView") .map(Values.create()) - .partitionBy(pv -> pv.getMemberId(), pv -> pv) + .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") .sink((m, collector, coordinator) -> { received.add(m.getValue()); }); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 7da0e77..dda3d24 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -145,7 +145,7 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { final StreamApplication app = (streamGraph, cfg) -> { streamGraph.<KV<String, PageView>>getInputStream("PageView") .map(EndOfStreamIntegrationTest.Values.create()) - .partitionBy(pv -> pv.getMemberId(), pv -> pv) + .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") .sink((m, collector, coordinator) -> { received.add(m.getValue()); }); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java index e35dfb7..346e958 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -52,23 +52,26 @@ public class RepartitionJoinWindowApp implements StreamApplication { graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde())); MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews - .partitionBy(PageView::getViewId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class))) + .partitionBy(PageView::getViewId, pv -> pv, + new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId") .map(KV::getValue); MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks - .partitionBy(AdClick::getViewId, ac -> ac, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class))) + .partitionBy(AdClick::getViewId, ac -> ac, + new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId") .map(KV::getValue); MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(), new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class), - Duration.ofMinutes(1)); + Duration.ofMinutes(1), "pageViewAdClickJoin"); userPageAdClicks .partitionBy(UserPageAdClick::getUserId, upac -> upac, - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class))) + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId") .map(KV::getValue) - .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3), new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class))) + .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3), + new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow") .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size()))) .sendTo(outputStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java index 6410e7d..997127e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -50,8 +50,8 @@ public class SessionWindowApp implements StreamApplication { pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) - .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3), new StringSerde(), - new JsonSerdeV2<>(PageView.class))) + .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3), + new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow") .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java index 5d04f21..5d2a17c 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -51,7 +51,8 @@ public class TumblingWindowApp implements StreamApplication { pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) - .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3), null, null)) + .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3), + new StringSerde(), new JsonSerdeV2<>(PageView.class)), "tumblingWindow") .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); }
