Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 373048aa0 -> 001be632d
http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java new file mode 100644 index 0000000..e08ca20 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of a simple user-defined tasks w/ window operators + * + */ +public class TestWindowExample extends TestExampleBase { + class MessageType { + String field1; + String field2; + } + + TestWindowExample(Set<SystemStreamPartition> inputs) { + super(inputs); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public void init(StreamGraph graph, Config config) { + BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; + inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return source; + } + + @Override public Properties getProperties() { + return null; + } + }, null, null). + map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), + m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); + + } + + String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { + return m.getKey().toString(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java deleted file mode 100644 index a365411..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamGraphFactory; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.function.BiFunction; -import java.util.Properties; -import java.util.Set; - - -/** - * Example implementation of a simple user-defined tasks w/ window operators - * - */ -public class WindowGraph implements StreamGraphFactory { - class MessageType { - String field1; - String field2; - } - - private final Set<SystemStreamPartition> inputs; - - WindowGraph(Set<SystemStreamPartition> inputs) { - this.inputs = inputs; - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public StreamGraph create(Config config) { - StreamGraphImpl graph = new StreamGraphImpl(); - BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; - inputs.forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return source.getSystemStream(); - } - - @Override public Properties getProperties() { - return null; - } - }, null, null). - map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), - m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); - - return graph; - } - - String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { - return m.getKey().toString(); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index d5607d8..160a47a 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -21,7 +21,6 @@ package org.apache.samza.operators; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.KeyValueJoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; @@ -140,8 +139,22 @@ public class TestMessageStreamImpl { MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph); MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph); JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner = - (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>) - (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() { + @Override + public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) { + return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + } + + @Override + public String getFirstKey(TestMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(TestMessageEnvelope message) { + return message.getKey(); + } + }; MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner); Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs(); http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index ec63d41..02637a3 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -28,7 +28,7 @@ import org.apache.samza.operators.TestMessageStreamImplUtil; import org.apache.samza.operators.TestOutputMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.KeyValueJoinFunction; +import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; @@ -192,9 +192,22 @@ public class TestOperatorImpls { Config mockConfig = mock(Config.class); input1 .join(input2, - (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>) (m1, m2) -> - new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()) - ) + new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() { + @Override + public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) { + return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + } + + @Override + public String getFirstKey(TestMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(TestMessageEnvelope message) { + return message.getKey(); + } + }) .map(m -> m); OperatorGraph opGraph = new OperatorGraph(); // now, we create chained operators from each input sources
