Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 [created] 373048aa0
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java new file mode 100644 index 0000000..5f659ba --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.lang.reflect.Field; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.OperatorGraph; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * Unit test for {@link StreamOperatorTask} + */ +public class TestFluentStreamTasks { + + private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { + for (int i = 0; i < 4; i++) { + this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i))); + } + } }; + + @Test + public void testUserTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + WindowGraph userTask = new WindowGraph(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + + @Test + public void testSplitTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + BroadcastGraph splitTask = new BroadcastGraph(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + + @Test + public void testJoinTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + JoinGraph joinTask = new JoinGraph(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java new file mode 100644 index 0000000..a365411 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of a simple user-defined tasks w/ window operators + * + */ +public class WindowGraph implements StreamGraphFactory { + class MessageType { + String field1; + String field2; + } + + private final Set<SystemStreamPartition> inputs; + + WindowGraph(Set<SystemStreamPartition> inputs) { + this.inputs = inputs; + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public StreamGraph create(Config config) { + StreamGraphImpl graph = new StreamGraphImpl(); + BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; + inputs.forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return source.getSystemStream(); + } + + @Override public Properties getProperties() { + return null; + } + }, null, null). + map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), + m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); + + return graph; + } + + String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { + return m.getKey().toString(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java deleted file mode 100644 index 663d98c..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators; - - -import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.Map; -import java.util.function.BiFunction; - - -/** - * Example implementation of split stream tasks - * - */ -public class BroadcastTask implements StreamOperatorTask { - class MessageType { - String field1; - String field2; - String field3; - String field4; - String parKey; - private long timestamp; - - public long getTimestamp() { - return this.timestamp; - } - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) { - BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; - messageStreams.values().forEach(entry -> { - MessageStream<JsonMessageEnvelope> inputStream = entry.map(this::getInputMessage); - - inputStream.filter(this::myFilter1) - .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter1) - .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter1) - .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - }); - } - - JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope m1) { - return (JsonMessageEnvelope) m1.getMessage(); - } - - boolean myFilter1(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key1"); - } - - boolean myFilter2(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key2"); - } - - boolean myFilter3(JsonMessageEnvelope m1) { - return m1.getMessage().parKey.equals("key3"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java deleted file mode 100644 index 1b10609..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators; - -import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -/** - * Example implementation of unique key-based stream-stream join tasks - * - */ -public class JoinTask implements StreamOperatorTask { - class MessageType { - String joinKey; - List<String> joinFields = new ArrayList<>(); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - MessageStream<JsonMessageEnvelope> joinOutput = null; - - @Override - public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) { - messageStreams.values().forEach(messageStream -> { - MessageStream<JsonMessageEnvelope> newSource = messageStream.map(this::getInputMessage); - if (joinOutput == null) { - joinOutput = newSource; - } else { - joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2)); - } - }); - } - - private JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope ism) { - return new JsonMessageEnvelope( - ((MessageType) ism.getMessage()).joinKey, - (MessageType) ism.getMessage(), - ism.getOffset(), - ism.getSystemStreamPartition()); - } - - JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { - MessageType newJoinMsg = new MessageType(); - newJoinMsg.joinKey = m1.getKey(); - newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); - newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); - return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java deleted file mode 100644 index 61bb32a..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.operators.impl.OperatorImpl; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.TaskContext; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import java.lang.reflect.Field; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -public class TestFluentStreamAdaptorTask { - Field userTaskField = null; - Field operatorChainsField = null; - - @Before - public void prep() throws NoSuchFieldException { - userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask"); - operatorChainsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - userTaskField.setAccessible(true); - operatorChainsField.setAccessible(true); - } - - @Test - public void testConstructor() throws IllegalAccessException { - StreamOperatorTask userTask = mock(StreamOperatorTask.class); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); - StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask); - Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask); - assertEquals(taskMemberVar, userTask); - assertTrue(chainsMap.isEmpty()); - } - - @Test - public void testInit() throws Exception { - StreamOperatorTask userTask = mock(StreamOperatorTask.class); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - Set<SystemStreamPartition> testInputs = new HashSet() { { - this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0))); - this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1))); - } }; - when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs); - adaptorTask.init(mockConfig, mockContext); - verify(userTask, times(1)).transform(Mockito.anyMap()); - Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask); - assertTrue(chainsMap.size() == 2); - assertTrue(chainsMap.containsKey(testInputs.toArray()[0])); - assertTrue(chainsMap.containsKey(testInputs.toArray()[1])); - } - - // TODO: window and process methods to be added after implementation of ChainedOperators.create() -} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java deleted file mode 100644 index d804bf8..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.Partition; -import org.apache.samza.config.Config; - -import org.apache.samza.operators.impl.OperatorImpl; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.TaskContext; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -/** - * Unit test for {@link StreamOperatorTask} - */ -public class TestFluentStreamTasks { - - private final WindowTask userTask = new WindowTask(); - - private final BroadcastTask splitTask = new BroadcastTask(); - - private final JoinTask joinTask = new JoinTask(); - - private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { - for (int i = 0; i < 4; i++) { - this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i))); - } - } }; - - @Test - public void testUserTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask); - Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - pipelineMapFld.setAccessible(true); - Map<SystemStreamPartition, OperatorImpl> pipelineMap = - (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - assertEquals(pipelineMap.size(), 4); - this.inputPartitions.forEach(partition -> { - assertNotNull(pipelineMap.get(partition)); - }); - } - - @Test - public void testSplitTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask); - Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - pipelineMapFld.setAccessible(true); - Map<SystemStreamPartition, OperatorImpl> pipelineMap = - (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - assertEquals(pipelineMap.size(), 4); - this.inputPartitions.forEach(partition -> { - assertNotNull(pipelineMap.get(partition)); - }); - } - - @Test - public void testJoinTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask); - Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - pipelineMapFld.setAccessible(true); - Map<SystemStreamPartition, OperatorImpl> pipelineMap = - (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - assertEquals(pipelineMap.size(), 4); - this.inputPartitions.forEach(partition -> { - assertNotNull(pipelineMap.get(partition)); - }); - } - - -} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 5991e2f..d5607d8 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -21,6 +21,7 @@ package org.apache.samza.operators; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.KeyValueJoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; @@ -29,6 +30,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; @@ -46,17 +48,19 @@ import static org.mockito.Mockito.when; public class TestMessageStreamImpl { + private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + @Test public void testMap() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(); - MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = - m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1); + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) -> + new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1); MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap); Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next(); assertTrue(mapOp instanceof StreamOperatorSpec); - assertEquals(mapOp.getOutputStream(), outputStream); + assertEquals(mapOp.getNextStream(), outputStream); // assert that the transformation function is what we defined above TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class); TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); @@ -73,33 +77,33 @@ public class TestMessageStreamImpl { @Test public void testFlatMap() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(); + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { { this.add(mock(TestOutputMessageEnvelope.class)); this.add(mock(TestOutputMessageEnvelope.class)); this.add(mock(TestOutputMessageEnvelope.class)); } }; - FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = m -> flatOuts; + FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts; MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap); Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next(); assertTrue(flatMapOp instanceof StreamOperatorSpec); - assertEquals(flatMapOp.getOutputStream(), outputStream); + assertEquals(flatMapOp.getNextStream(), outputStream); // assert that the transformation function is what we defined above assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap); } @Test public void testFilter() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(); - FilterFunction<TestMessageEnvelope> xFilter = m -> m.getMessage().getEventTime() > 123456L; + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L; MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter); Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next(); assertTrue(filterOp instanceof StreamOperatorSpec); - assertEquals(filterOp.getOutputStream(), outputStream); + assertEquals(filterOp.getNextStream(), outputStream); // assert that the transformation function is what we defined above FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn(); TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); @@ -117,8 +121,8 @@ public class TestMessageStreamImpl { @Test public void testSink() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(); - SinkFunction<TestMessageEnvelope> xSink = (m, mc, tc) -> { + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> { mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); }; @@ -128,26 +132,28 @@ public class TestMessageStreamImpl { OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next(); assertTrue(sinkOp instanceof SinkOperatorSpec); assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink); - assertNull(((SinkOperatorSpec) sinkOp).getOutputStream()); + assertNull(((SinkOperatorSpec) sinkOp).getNextStream()); } @Test public void testJoin() { - MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(); - MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(); - JoinFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner = - (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph); + MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph); + JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner = + (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>) + (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner); Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next(); assertTrue(joinOp1 instanceof PartialJoinOperatorSpec); - assertEquals(((PartialJoinOperatorSpec) joinOp1).getOutputStream(), joinOutput); + assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput); subs = source2.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next(); assertTrue(joinOp2 instanceof PartialJoinOperatorSpec); - assertEquals(((PartialJoinOperatorSpec) joinOp2).getOutputStream(), joinOutput); + assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput); TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L); TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L); TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2); @@ -160,10 +166,10 @@ public class TestMessageStreamImpl { @Test public void testMerge() { - MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(); + MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph); Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { { - this.add(new MessageStreamImpl<>()); - this.add(new MessageStreamImpl<>()); + this.add(new MessageStreamImpl<>(mockGraph)); + this.add(new MessageStreamImpl<>(mockGraph)); } }; MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others); validateMergeOperator(merge1, mergeOutput); @@ -176,7 +182,7 @@ public class TestMessageStreamImpl { assertEquals(subs.size(), 1); OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next(); assertTrue(mergeOp instanceof StreamOperatorSpec); - assertEquals(((StreamOperatorSpec) mergeOp).getOutputStream(), mergeOutput); + assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput); TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg); assertEquals(outputs.size(), 1); http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java new file mode 100644 index 0000000..c4e9f51 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + + +public class TestMessageStreamImplUtil { + public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) { + return new MessageStreamImpl<M>(graph); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java deleted file mode 100644 index e176063..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators; - -import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.Map; -import java.util.function.BiFunction; - - -/** - * Example implementation of a simple user-defined tasks w/ window operators - * - */ -public class WindowTask implements StreamOperatorTask { - class MessageType { - String field1; - String field2; - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) { - BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; - messageStreams.values().forEach(source -> - source.map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), m1.getSystemStreamPartition())) - .window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)) - ); - } - - String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { - return m.getKey().toString(); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index bae98e3..ec63d41 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -18,17 +18,26 @@ */ package org.apache.samza.operators.impl; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.TestMessageEnvelope; +import org.apache.samza.operators.TestMessageStreamImplUtil; import org.apache.samza.operators.TestOutputMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.KeyValueJoinFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.task.TaskContext; import org.junit.Before; import org.junit.Test; @@ -38,7 +47,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.Set; -import java.util.function.BiFunction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; @@ -49,25 +57,46 @@ import static org.mockito.Mockito.when; public class TestOperatorImpls { Field nextOperatorsField = null; + Method createOpMethod = null; + Method createOpsMethod = null; @Before - public void prep() throws NoSuchFieldException { + public void prep() throws NoSuchFieldException, NoSuchMethodException { nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators"); nextOperatorsField.setAccessible(true); + + createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, + OperatorSpec.class, Config.class, TaskContext.class); + createOpMethod.setAccessible(true); + + createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); + createOpsMethod.setAccessible(true); } - + @Test - public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException { + public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException { // get window operator WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); - OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = OperatorImpls.createOperatorImpl(mockWnd); + WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null); + when(mockWnd.getWindow()).thenReturn(windowInternal); + MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + + OperatorGraph opGraph = new OperatorGraph(); + OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) + createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext); assertTrue(opImpl instanceof WindowOperatorImpl); + Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window"); + wndInternalField.setAccessible(true); + WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl); + assertEquals(wndInternal, windowInternal); // get simple operator StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class); when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn); - opImpl = OperatorImpls.createOperatorImpl(mockSimpleOp); + opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); assertTrue(opImpl instanceof StreamOperatorImpl); Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn"); txfmFnField.setAccessible(true); @@ -77,7 +106,7 @@ public class TestOperatorImpls { SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { }; SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); when(sinkOp.getSinkFn()).thenReturn(sinkFn); - opImpl = OperatorImpls.createOperatorImpl(sinkOp); + opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); assertTrue(opImpl instanceof SinkOperatorImpl); Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn"); sinkFnField.setAccessible(true); @@ -86,28 +115,33 @@ public class TestOperatorImpls { // get join operator PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class); TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class); - BiFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = (m1, m2) -> mockOutput; + PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class); when(joinOp.getTransformFn()).thenReturn(joinFn); - opImpl = OperatorImpls.createOperatorImpl(joinOp); + opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); assertTrue(opImpl instanceof PartialJoinOperatorImpl); } @Test - public void testEmptyChain() { + public void testEmptyChain() throws InvocationTargetException, IllegalAccessException { // test creation of empty chain - MessageStreamImpl<TestMessageEnvelope> testStream = new MessageStreamImpl<>(); + MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class); TaskContext mockContext = mock(TaskContext.class); - RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testStream, mockContext); + Config mockConfig = mock(Config.class); + OperatorGraph opGraph = new OperatorGraph(); + RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext); assertTrue(operatorChain != null); } @Test - public void testLinearChain() throws IllegalAccessException { + public void testLinearChain() throws IllegalAccessException, InvocationTargetException { // test creation of linear chain - MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>(); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); - testInput.map(m -> m).window(Windows.tumblingWindow(Duration.ofMillis(1000))); - RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext); + Config mockConfig = mock(Config.class); + testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10))); + OperatorGraph opGraph = new OperatorGraph(); + RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); assertEquals(subsSet.size(), 1); OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next(); @@ -119,13 +153,16 @@ public class TestOperatorImpls { } @Test - public void testBroadcastChain() throws IllegalAccessException { + public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException { // test creation of broadcast chain - MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>(); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + Config mockConfig = mock(Config.class); testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m); - RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext); + OperatorGraph opGraph = new OperatorGraph(); + RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); assertEquals(subsSet.size(), 2); Iterator<OperatorImpl> iter = subsSet.iterator(); @@ -146,18 +183,23 @@ public class TestOperatorImpls { } @Test - public void testJoinChain() throws IllegalAccessException { + public void testJoinChain() throws IllegalAccessException, InvocationTargetException { // test creation of join chain - MessageStreamImpl<TestMessageEnvelope> input1 = new MessageStreamImpl<>(); - MessageStreamImpl<TestMessageEnvelope> input2 = new MessageStreamImpl<>(); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); + MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + Config mockConfig = mock(Config.class); input1 - .join(input2, (m1, m2) -> - new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length())) + .join(input2, + (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>) (m1, m2) -> + new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()) + ) .map(m -> m); + OperatorGraph opGraph = new OperatorGraph(); // now, we create chained operators from each input sources - RootOperatorImpl chain1 = OperatorImpls.createOperatorImpls(input1, mockContext); - RootOperatorImpl chain2 = OperatorImpls.createOperatorImpls(input2, mockContext); + RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext); + RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext); // check that those two chains will merge at map operator // first branch of the join Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1); http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index ba5b6f8..ce9fdd2 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -18,17 +18,16 @@ */ package org.apache.samza.operators.impl; +import org.apache.samza.config.Config; import org.apache.samza.operators.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class TestSinkOperatorImpl { @@ -38,7 +37,9 @@ public class TestSinkOperatorImpl { SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class); when(sinkOp.getSinkFn()).thenReturn(sinkFn); - SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext); TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class); MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index 5a3840c..010a210 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -18,22 +18,20 @@ */ package org.apache.samza.operators.impl; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.TestMessageEnvelope; import org.apache.samza.operators.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import java.util.ArrayList; -import java.util.Collection; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class TestStreamOperatorImpl { @@ -43,8 +41,10 @@ public class TestStreamOperatorImpl { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); - - StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp)); + MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext)); TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class); TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class); Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { { http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java index ffe9df1..31257a4 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -18,14 +18,18 @@ */ package org.apache.samza.operators.spec; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.TestMessageEnvelope; +import org.apache.samza.operators.TestMessageStreamImplUtil; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.operators.windows.WindowKey; import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; import org.junit.Test; import java.util.ArrayList; @@ -42,19 +46,23 @@ public class TestOperatorSpecs { @Test public void testGetStreamOperator() { FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { - this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L)); - } }; - StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn); + this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L)); + } }; + MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput); assertEquals(strmOp.getTransformFn(), transformFn); - assertTrue(strmOp.getOutputStream() instanceof MessageStreamImpl); + assertEquals(strmOp.getNextStream(), mockOutput); } @Test public void testGetSinkOperator() { - SinkFunction<TestMessageEnvelope> sinkFn = (m, c, t) -> { }; - SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn); + SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector, + TaskCoordinator taskCoordinator) -> { }; + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph); assertEquals(sinkOp.getSinkFn(), sinkFn); - assertTrue(sinkOp.getOutputStream() == null); + assertTrue(sinkOp.getNextStream() == null); } @Test @@ -65,8 +73,9 @@ public class TestOperatorSpecs { //instantiate a window using reflection WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null); - WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, WindowKey<String>, Integer, - WindowPane<WindowKey<String>, Integer>>createWindowOperatorSpec(window); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class); + WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut); assertEquals(spec.getWindow(), window); assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); assertEquals(spec.getWindow().getFoldFunction(), aggregator); @@ -74,13 +83,30 @@ public class TestOperatorSpecs { @Test public void testGetPartialJoinOperator() { - BiFunction<MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger = - (m1, m2) -> new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime()); - MessageStreamImpl<TestMessageEnvelope> joinOutput = new MessageStreamImpl<>(); + PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger = + new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() { + @Override + public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) { + return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime()); + } + + @Override + public Object getKey(MessageEnvelope<Object, ?> message) { + return message.getKey(); + } + + @Override + public Object getOtherKey(MessageEnvelope<Object, ?> message) { + return message.getKey(); + } + }; + + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin = - OperatorSpecs.createPartialJoinOperatorSpec(merger, joinOutput); + OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput); - assertEquals(partialJoin.getOutputStream(), joinOutput); + assertEquals(partialJoin.getNextStream(), joinOutput); MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class); MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class); assertEquals(partialJoin.getTransformFn(), merger); @@ -88,13 +114,14 @@ public class TestOperatorSpecs { @Test public void testGetMergeOperator() { - MessageStreamImpl<TestMessageEnvelope> output = new MessageStreamImpl<>(); - StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(output); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output); Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { { this.add(t); } }; TestMessageEnvelope t = mock(TestMessageEnvelope.class); assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t)); - assertEquals(mergeOp.getOutputStream(), output); + assertEquals(mergeOp.getNextStream(), output); } }
