http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java deleted file mode 100644 index 1c30a21..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import java.time.Duration; -import java.util.Set; -import java.util.function.Supplier; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * Example implementation of split stream tasks - * - */ -public class TestBroadcastExample extends TestExampleBase { - - TestBroadcastExample(Set<SystemStreamPartition> inputs) { - super(inputs); - } - - class MessageType { - String field1; - String field2; - String field3; - String field4; - String parKey; - private long timestamp; - - public long getTimestamp() { - return this.timestamp; - } - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public void init(StreamGraph graph, Config config) { - FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1; - Supplier<Integer> initialValue = () -> 0; - - inputs.keySet().forEach(entry -> { - MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream( - new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage); - - inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - }); - } - - JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) { - return (JsonMessageEnvelope) m1.getMessage(); - } - - boolean myFilter1(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key1"); - } - - boolean myFilter2(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key2"); - } - - boolean myFilter3(JsonMessageEnvelope m1) { - return m1.getMessage().parKey.equals("key3"); - } - -}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java deleted file mode 100644 index dd661a0..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Base class for test examples - * - */ -public abstract class TestExampleBase implements StreamApplication { - - protected final Map<SystemStream, Set<SystemStreamPartition>> inputs; - - TestExampleBase(Set<SystemStreamPartition> inputs) { - this.inputs = new HashMap<>(); - for (SystemStreamPartition input : inputs) { - this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>()); - this.inputs.get(input.getSystemStream()).add(input); - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java deleted file mode 100644 index 6c9f8c2..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - - -/** - * Example implementation of unique key-based stream-stream join tasks - * - */ -public class TestJoinExample extends TestExampleBase { - - TestJoinExample(Set<SystemStreamPartition> inputs) { - super(inputs); - } - - class MessageType { - String joinKey; - List<String> joinFields = new ArrayList<>(); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - MessageStream<JsonMessageEnvelope> joinOutput = null; - - @Override - public void init(StreamGraph graph, Config config) { - - for (SystemStream input : inputs.keySet()) { - StreamSpec inputStreamSpec = new StreamSpec(input.getSystem() + "-" + input.getStream(), input.getStream(), input.getSystem()); - MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( - inputStreamSpec, null, null).map(this::getInputMessage); - if (joinOutput == null) { - joinOutput = newSource; - } else { - joinOutput = joinOutput.join(newSource, new MyJoinFunction(), Duration.ofMinutes(1)); - } - } - - joinOutput.sendTo(graph.createOutStream( - new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"), - new StringSerde("UTF-8"), new JsonSerde<>())); - - } - - private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { - return new JsonMessageEnvelope( - ((MessageType) ism.getMessage()).joinKey, - (MessageType) ism.getMessage(), - ism.getOffset(), - ism.getSystemStreamPartition()); - } - - class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> { - JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { - MessageType newJoinMsg = new MessageType(); - newJoinMsg.joinKey = m1.getKey(); - newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); - newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); - return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); - } - - @Override - public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) { - return this.myJoinResult(message, otherMessage); - } - - @Override - public String getFirstKey(JsonMessageEnvelope message) { - return message.getKey(); - } - - @Override - public String getSecondKey(JsonMessageEnvelope message) { - return message.getKey(); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java deleted file mode 100644 index c88df7c..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.Set; -import java.util.function.Supplier; - - -/** - * Example implementation of a simple user-defined tasks w/ window operators - * - */ -public class TestWindowExample extends TestExampleBase { - class MessageType { - String field1; - String field2; - } - - TestWindowExample(Set<SystemStreamPartition> inputs) { - super(inputs); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public void init(StreamGraph graph, Config config) { - FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1; - Supplier<Integer> initialValue = () -> 0; - inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream( - new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null). - map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), - m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator))); - - } - - String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { - return m.getKey().toString(); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java new file mode 100644 index 0000000..159dba2 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; +import java.util.function.Supplier; + + +/** + * Example implementation of a simple user-defined task w/ a window operator. + * + */ +public class WindowExample implements StreamApplication { + + @Override + public void init(StreamGraph graph, Config config) { + Supplier<Integer> initialValue = () -> 0; + FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1; + MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m); + OutputStream<String, Integer, WindowPane<Void, Integer>> outputStream = graph + .getOutputStream("outputStream", m -> m.getKey().getPaneId(), m -> m.getMessage()); + + // create a tumbling window that outputs the number of message collected every 10 minutes. + // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive + // for 1 minute. + inputStream + .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1))))) + .sendTo(outputStream); + } + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + localRunner.run(new WindowExample()); + } + + class PageViewEvent { + String key; + long timestamp; + + public PageViewEvent(String key, long timestamp) { + this.key = key; + this.timestamp = timestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index e524ba1..e661798 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -19,56 +19,52 @@ package org.apache.samza.execution; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; import org.apache.samza.Partition; -import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.job.ApplicationStatus; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.runtime.AbstractApplicationRunner; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestExecutionPlanner { - private Config config; - private static final String DEFAULT_SYSTEM = "test-system"; private static final int DEFAULT_PARTITIONS = 10; + private Map<String, SystemAdmin> systemAdmins; + private StreamManager streamManager; + private ApplicationRunner runner; + private Config config; + private StreamSpec input1; private StreamSpec input2; private StreamSpec input3; private StreamSpec output1; private StreamSpec output2; - private Map<String, SystemAdmin> systemAdmins; - private StreamManager streamManager; - - private ApplicationRunner runner; - private JoinFunction createJoin() { return new JoinFunction() { @Override @@ -88,14 +84,6 @@ public class TestExecutionPlanner { }; } - private SinkFunction createSink() { - return new SinkFunction() { - @Override - public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) { - } - }; - } - private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) { return new SystemAdmin() { @@ -139,37 +127,43 @@ public class TestExecutionPlanner { }; } - private StreamGraph createSimpleGraph() { + private StreamGraphImpl createSimpleGraph() { /** * a simple graph of partitionBy and map * * input1 -> partitionBy -> map -> output1 * */ - StreamGraph streamGraph = new StreamGraphImpl(runner, config); - streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null)); + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null); + streamGraph.getInputStream("input1", null) + .partitionBy(m -> "yes!!!").map(m -> m) + .sendTo(output1); return streamGraph; } - private StreamGraph createStreamGraphWithJoin() { + private StreamGraphImpl createStreamGraphWithJoin() { - /** the graph looks like the following + /** + * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. * - * input1 -> map -> join -> output1 - * | - * input2 -> partitionBy -> filter -| - * | - * input3 -> filter -> partitionBy -> map -> join -> output2 + * input1 (64) -> map -> join -> output1 (8) + * | + * input2 (16) -> partitionBy ("64") -> filter -| + * | + * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16) * */ - StreamGraph streamGraph = new StreamGraphImpl(runner, config); - MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m); - MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m); + MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true); + MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null); + OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", null, null); - m1.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output1, null, null)); - m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output2, null, null)); + m1.join(m2, createJoin(), Duration.ofHours(2)).sendTo(output1); + m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(output2); return streamGraph; } @@ -205,27 +199,26 @@ public class TestExecutionPlanner { systemAdmins.put("system2", systemAdmin2); streamManager = new StreamManager(systemAdmins); - runner = new AbstractApplicationRunner(config) { - @Override - public void run(StreamApplication streamApp) { - } - - @Override - public void kill(StreamApplication streamApp) { - - } - - @Override - public ApplicationStatus status(StreamApplication streamApp) { - return null; - } - }; + runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("input1")).thenReturn(input1); + when(runner.getStreamSpec("input2")).thenReturn(input2); + when(runner.getStreamSpec("input3")).thenReturn(input3); + when(runner.getStreamSpec("output1")).thenReturn(output1); + when(runner.getStreamSpec("output2")).thenReturn(output2); + + // intermediate streams used in tests + when(runner.getStreamSpec("test-app-1-partition_by-0")) + .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-1")) + .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-4")) + .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system")); } @Test public void testCreateProcessorGraph() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraph streamGraph = createStreamGraphWithJoin(); + StreamGraphImpl streamGraph = createStreamGraphWithJoin(); JobGraph jobGraph = planner.createJobGraph(streamGraph); assertTrue(jobGraph.getSources().size() == 3); @@ -236,7 +229,7 @@ public class TestExecutionPlanner { @Test public void testFetchExistingStreamPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraph streamGraph = createStreamGraphWithJoin(); + StreamGraphImpl streamGraph = createStreamGraphWithJoin(); JobGraph jobGraph = planner.createJobGraph(streamGraph); ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager); @@ -254,7 +247,7 @@ public class TestExecutionPlanner { @Test public void testCalculateJoinInputPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraph streamGraph = createStreamGraphWithJoin(); + StreamGraphImpl streamGraph = createStreamGraphWithJoin(); JobGraph jobGraph = planner.createJobGraph(streamGraph); ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager); @@ -273,7 +266,7 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraph streamGraph = createSimpleGraph(); + StreamGraphImpl streamGraph = createSimpleGraph(); JobGraph jobGraph = planner.createJobGraph(streamGraph); planner.calculatePartitions(streamGraph, jobGraph); @@ -286,7 +279,7 @@ public class TestExecutionPlanner { @Test public void testCalculateIntStreamPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraph streamGraph = createSimpleGraph(); + StreamGraphImpl streamGraph = createSimpleGraph(); JobGraph jobGraph = planner.createJobGraph(streamGraph); planner.calculatePartitions(streamGraph, jobGraph); http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 4e6c750..acc8588 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; @@ -48,7 +47,6 @@ import static org.mockito.Mockito.when; public class TestJoinOperator { private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); - private final ApplicationRunner runner = mock(ApplicationRunner.class); private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @Test @@ -226,6 +224,10 @@ public class TestJoinOperator { } private StreamOperatorTask createStreamOperatorTask() throws Exception { + ApplicationRunner runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem")); + when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2")); + TaskContext taskContext = mock(TaskContext.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), @@ -239,13 +241,12 @@ public class TestJoinOperator { } private class TestStreamApplication implements StreamApplication { - StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem"); - StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2"); - @Override public void init(StreamGraph graph, Config config) { - MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null); - MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null); + MessageStream<FirstStreamIME> inStream = + graph.getInputStream("instream", FirstStreamIME::new); + MessageStream<SecondStreamIME> inStream2 = + graph.getInputStream("instream2", SecondStreamIME::new); SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); inStream @@ -256,22 +257,20 @@ public class TestJoinOperator { } } - private class TestJoinFunction - implements JoinFunction<Integer, MessageEnvelope<Integer, Integer>, MessageEnvelope<Integer, Integer>, Integer> { + private class TestJoinFunction implements JoinFunction<Integer, FirstStreamIME, SecondStreamIME, Integer> { @Override - public Integer apply(MessageEnvelope<Integer, Integer> message, - MessageEnvelope<Integer, Integer> otherMessage) { - return message.getMessage() + otherMessage.getMessage(); + public Integer apply(FirstStreamIME message, SecondStreamIME otherMessage) { + return (Integer) message.getMessage() + (Integer) otherMessage.getMessage(); } @Override - public Integer getFirstKey(MessageEnvelope<Integer, Integer> message) { - return message.getKey(); + public Integer getFirstKey(FirstStreamIME message) { + return (Integer) message.getKey(); } @Override - public Integer getSecondKey(MessageEnvelope<Integer, Integer> message) { - return message.getKey(); + public Integer getSecondKey(SecondStreamIME message) { + return (Integer) message.getKey(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 8a2dd95..e815b81 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -18,25 +18,19 @@ */ package org.apache.samza.operators; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -45,6 +39,15 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -138,7 +141,6 @@ public class TestMessageStreamImpl { OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next(); assertTrue(sinkOp instanceof SinkOperatorSpec); assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink); - assertNull(((SinkOperatorSpec) sinkOp).getNextStream()); } @Test @@ -220,8 +222,8 @@ public class TestMessageStreamImpl { MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph); Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222"; inputStream.partitionBy(keyExtractorFunc); - assertTrue(streamGraph.getInStreams().size() == 1); - assertTrue(streamGraph.getOutStreams().size() == 1); + assertTrue(streamGraph.getInputStreams().size() == 1); + assertTrue(streamGraph.getOutputStreams().size() == 1); Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); @@ -229,11 +231,7 @@ public class TestMessageStreamImpl { assertTrue(partitionByOp instanceof SinkOperatorSpec); assertNull(partitionByOp.getNextStream()); - ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000), new MessageCollector() { - @Override - public void send(OutgoingMessageEnvelope envelope) { - assertTrue(envelope.getPartitionKey().equals("222")); - } - }, null); + ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000), + envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java new file mode 100644 index 0000000..6603137 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import junit.framework.Assert; +import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.triggers.FiringType; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.testUtils.TestClock; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestWindowOperator { + private final MessageCollector messageCollector = mock(MessageCollector.class); + private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); + private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>(); + private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); + private Config config; + private TaskContext taskContext; + private ApplicationRunner runner; + + @Before + public void setup() throws Exception { + windowPanes.clear(); + + config = mock(Config.class); + taskContext = mock(TaskContext.class); + runner = mock(ApplicationRunner.class); + when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka")); + } + + @Test + public void testTumblingWindowsDiscardingMode() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 5); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3)); + Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1); + } + + @Test + public void testTumblingWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 7); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4); + } + + @Test + public void testSessionWindowsDiscardingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001"); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001"); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001"); + Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2); + + } + + @Test + public void testSessionWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4); + } + + @Test + public void testCancellationOfOnceTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, + Duration.ofSeconds(1), Triggers.count(2)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + + task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1); + + } + + @Test + public void testCancellationOfAnyTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + Assert.assertEquals(windowPanes.size(), 1); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5); + + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + + //advance timer by > 500 millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(900)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000"); + } + + @Test + public void testCancelationOfRepeatingNestedTriggers() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofMillis(500)); + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 3); + + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 4); + } + + private class KeyedTumblingWindowStreamApplication implements StreamApplication { + + private final AccumulationMode mode; + private final Duration duration; + private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger; + + KeyedTumblingWindowStreamApplication(AccumulationMode mode, + Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) { + this.mode = mode; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream", + (k, m) -> new MessageEnvelope(k, m)); + Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); + inStream + .map(m -> m) + .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode)) + .map(m -> { + windowPanes.add(m); + return m; + }); + } + } + + private class KeyedSessionWindowStreamApplication implements StreamApplication { + + private final AccumulationMode mode; + private final Duration duration; + + KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { + this.mode = mode; + this.duration = duration; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream", + (k, m) -> new MessageEnvelope(k, m)); + Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); + + inStream + .map(m -> m) + .window(Windows.keyedSessionWindow(keyFn, duration) + .setAccumulationMode(mode)) + .map(m -> { + windowPanes.add(m); + return m; + }); + } + } + + private class IntegerMessageEnvelope extends IncomingMessageEnvelope { + IntegerMessageEnvelope(int key, int msg) { + super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg); + } + } + + private class MessageEnvelope<K, V> { + private final K key; + private final V value; + + MessageEnvelope(K key, V value) { + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java deleted file mode 100644 index 9a425d1..0000000 --- a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.data; - -import org.apache.samza.system.SystemStreamPartition; - - -/** - * Example input {@link MessageEnvelope} w/ Json message and string as the key. - */ - -public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> { - - private final String key; - private final T data; - private final Offset offset; - private final SystemStreamPartition partition; - - public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) { - this.key = key; - this.data = data; - this.offset = offset; - this.partition = partition; - } - - @Override - public T getMessage() { - return this.data; - } - - @Override - public String getKey() { - return this.key; - } - - public Offset getOffset() { - return this.offset; - } - - public SystemStreamPartition getSystemStreamPartition() { - return this.partition; - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java new file mode 100644 index 0000000..2524c28 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + + +public class TestMessageEnvelope { + + private final String key; + private final MessageType value; + + public TestMessageEnvelope(String key, String value, long eventTime) { + this.key = key; + this.value = new MessageType(value, eventTime); + } + + public MessageType getMessage() { + return this.value; + } + + public String getKey() { + return this.key; + } + + public class MessageType { + private final String value; + private final long eventTime; + + public MessageType(String value, long eventTime) { + this.value = value; + this.eventTime = eventTime; + } + + public long getEventTime() { + return eventTime; + } + + public String getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java new file mode 100644 index 0000000..f9537a3 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + + +public class TestOutputMessageEnvelope { + private final String key; + private final Integer value; + + public TestOutputMessageEnvelope(String key, Integer value) { + this.key = key; + this.value = value; + } + + public Integer getMessage() { + return this.value; + } + + public String getKey() { + return this.key; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 5722dbd..f978c3c 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,8 +18,8 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.TestMessageEnvelope; -import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.hamcrest.core.IsEqual; http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index 31f6f4a..267cdfc 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -18,24 +18,21 @@ */ package org.apache.samza.operators.impl; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.TestMessageEnvelope; import org.apache.samza.operators.TestMessageStreamImplUtil; -import org.apache.samza.operators.TestOutputMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.windows.Windows; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; @@ -44,6 +41,8 @@ import org.junit.Before; import org.junit.Test; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; @@ -66,11 +65,11 @@ public class TestOperatorImpls { nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators"); nextOperatorsField.setAccessible(true); - createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, + createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, OperatorSpec.class, Config.class, TaskContext.class); createOpMethod.setAccessible(true); - createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); + createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); createOpsMethod.setAccessible(true); } @@ -84,8 +83,8 @@ public class TestOperatorImpls { Config mockConfig = mock(Config.class); TaskContext mockContext = mock(TaskContext.class); - OperatorGraph opGraph = new OperatorGraph(); - OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) + OperatorImplGraph opGraph = new OperatorImplGraph(); + OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext); assertTrue(opImpl instanceof WindowOperatorImpl); Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window"); @@ -97,7 +96,7 @@ public class TestOperatorImpls { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class); when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); + opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); assertTrue(opImpl instanceof StreamOperatorImpl); Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn"); txfmFnField.setAccessible(true); @@ -107,7 +106,7 @@ public class TestOperatorImpls { SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { }; SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); when(sinkOp.getSinkFn()).thenReturn(sinkFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); + opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); assertTrue(opImpl instanceof SinkOperatorImpl); Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn"); sinkFnField.setAccessible(true); @@ -116,7 +115,7 @@ public class TestOperatorImpls { // get join operator PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class); PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class); - opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); + opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); assertTrue(opImpl instanceof PartialJoinOperatorImpl); } @@ -126,7 +125,7 @@ public class TestOperatorImpls { MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class); TaskContext mockContext = mock(TaskContext.class); Config mockConfig = mock(Config.class); - OperatorGraph opGraph = new OperatorGraph(); + OperatorImplGraph opGraph = new OperatorImplGraph(); RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext); assertTrue(operatorChain != null); } @@ -139,7 +138,7 @@ public class TestOperatorImpls { TaskContext mockContext = mock(TaskContext.class); Config mockConfig = mock(Config.class); testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10))); - OperatorGraph opGraph = new OperatorGraph(); + OperatorImplGraph opGraph = new OperatorImplGraph(); RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); assertEquals(subsSet.size(), 1); @@ -160,7 +159,7 @@ public class TestOperatorImpls { Config mockConfig = mock(Config.class); testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m); - OperatorGraph opGraph = new OperatorGraph(); + OperatorImplGraph opGraph = new OperatorImplGraph(); RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); assertEquals(subsSet.size(), 2); @@ -208,7 +207,7 @@ public class TestOperatorImpls { } }, Duration.ofMinutes(1)) .map(m -> m); - OperatorGraph opGraph = new OperatorGraph(); + OperatorImplGraph opGraph = new OperatorImplGraph(); // now, we create chained operators from each input sources RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext); RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext); http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index ce9fdd2..abd7740 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -19,7 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; -import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index 0a873fd..9dd161a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -22,8 +22,8 @@ import java.util.ArrayList; import java.util.Collection; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.TestMessageEnvelope; -import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java index ec1d74c..37e3d1a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -20,16 +20,16 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.TestMessageEnvelope; import org.apache.samza.operators.TestMessageStreamImplUtil; -import org.apache.samza.operators.TestOutputMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.stream.OutputStreamInternalImpl; import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; @@ -41,58 +41,79 @@ import java.util.function.Function; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; public class TestOperatorSpecs { @Test - public void testGetStreamOperator() { - FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { - this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L)); + public void testCreateStreamOperator() { + FlatMapFunction<?, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { + this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L)); } }; MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class); - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput); - assertEquals(strmOp.getTransformFn(), transformFn); - assertEquals(strmOp.getNextStream(), mockOutput); + StreamOperatorSpec<?, TestMessageEnvelope> streamOp = + OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1); + assertEquals(streamOp.getTransformFn(), transformFn); + assertEquals(streamOp.getNextStream(), mockOutput); } @Test - public void testGetSinkOperator() { + public void testCreateSinkOperator() { SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) -> { }; - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph); + SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1); assertEquals(sinkOp.getSinkFn(), sinkFn); - assertTrue(sinkOp.getNextStream() == null); + assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK); + assertEquals(sinkOp.getNextStream(), null); + } + + @Test + public void testCreateSendToOperator() { + OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class); + SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSendToOperatorSpec(mockOutput, 1); + assertNotNull(sinkOp.getSinkFn()); + assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SEND_TO); + assertEquals(sinkOp.getNextStream(), null); + } + + + @Test + public void testCreatePartitionByOperator() { + OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class); + SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createPartitionByOperatorSpec(mockOutput, 1); + assertNotNull(sinkOp.getSinkFn()); + assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.PARTITION_BY); + assertEquals(sinkOp.getNextStream(), null); } @Test - public void testGetWindowOperator() throws Exception { + public void testCreateWindowOperator() throws Exception { Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey"; FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1; Supplier<Integer> initialValue = () -> 0; //instantiate a window using reflection WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING); - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class); - WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut); + WindowOperatorSpec spec = + OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockWndOut, 1); assertEquals(spec.getWindow(), window); assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator); } @Test - public void testGetPartialJoinOperator() { - PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn = mock(PartialJoinFunction.class); - PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn = mock(PartialJoinFunction.class); + public void testCreatePartialJoinOperator() { + PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn + = mock(PartialJoinFunction.class); + PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn + = mock(PartialJoinFunction.class); StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); MessageStreamImpl<TestOutputMessageEnvelope> joinOutput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); - PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec = - OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, mockGraph, joinOutput); + PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec + = OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, joinOutput, 1); assertEquals(partialJoinSpec.getNextStream(), joinOutput); assertEquals(partialJoinSpec.getThisPartialJoinFn(), thisPartialJoinFn); @@ -100,13 +121,15 @@ public class TestOperatorSpecs { } @Test - public void testGetMergeOperator() { + public void testCreateMergeOperator() { StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); - StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output); - Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { { - this.add(t); - } }; + StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = + OperatorSpecs.createMergeOperatorSpec(output, 1); + Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = + t -> new ArrayList<TestMessageEnvelope>() { { + this.add(t); + } }; TestMessageEnvelope t = mock(TestMessageEnvelope.class); assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t)); assertEquals(mergeOp.getNextStream(), output); http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java deleted file mode 100644 index 674a8f1..0000000 --- a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.triggers; - -import org.apache.samza.util.Clock; - -import java.time.Duration; - -/** - * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration. - * Used for testing. - */ -public class TestClock implements Clock { - - long currentTime = 1; - - public void advanceTime(Duration duration) { - currentTime += duration.toMillis(); - } - - public void advanceTime(long millis) { - currentTime += millis; - } - - @Override - public long currentTimeMillis() { - return currentTime; - } -}