Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 373048aa0 -> 001be632d


http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java 
b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
new file mode 100644
index 0000000..e08ca20
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class TestWindowExample extends TestExampleBase {
+  class MessageType {
+    String field1;
+    String field2;
+  }
+
+  TestWindowExample(Set<SystemStreamPartition> inputs) {
+    super(inputs);
+  }
+
+  class JsonMessageEnvelope extends 
JsonIncomingSystemMessageEnvelope<MessageType> {
+
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, 
SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) 
-> c + 1;
+    inputs.keySet().forEach(source -> graph.<Object, Object, 
InputMessageEnvelope>createInStream(new StreamSpec() {
+      @Override public SystemStream getSystemStream() {
+        return source;
+      }
+
+      @Override public Properties getProperties() {
+        return null;
+      }
+    }, null, null).
+        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), 
(MessageType) m1.getMessage(), m1.getOffset(),
+            
m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200),
 maxAggregator)));
+
+  }
+
+  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+    return m.getKey().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java 
b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
deleted file mode 100644
index a365411..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphFactory;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class WindowGraph implements StreamGraphFactory {
-  class MessageType {
-    String field1;
-    String field2;
-  }
-
-  private final Set<SystemStreamPartition> inputs;
-
-  WindowGraph(Set<SystemStreamPartition> inputs) {
-    this.inputs = inputs;
-  }
-
-  class JsonMessageEnvelope extends 
JsonIncomingSystemMessageEnvelope<MessageType> {
-
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, 
SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public StreamGraph create(Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
-    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) 
-> c + 1;
-    inputs.forEach(source -> graph.<Object, Object, 
InputMessageEnvelope>createInStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return source.getSystemStream();
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, null, null).
-        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), 
(MessageType) m1.getMessage(), m1.getOffset(),
-            
m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200),
 maxAggregator)));
-
-    return graph;
-  }
-
-  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
-    return m.getKey().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index d5607d8..160a47a 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.KeyValueJoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -140,8 +139,22 @@ public class TestMessageStreamImpl {
     MessageStreamImpl<TestMessageEnvelope> source1 = new 
MessageStreamImpl<>(mockGraph);
     MessageStreamImpl<TestMessageEnvelope> source2 = new 
MessageStreamImpl<>(mockGraph);
     JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, 
TestOutputMessageEnvelope> joiner =
-        (KeyValueJoinFunction<String, TestMessageEnvelope, 
TestMessageEnvelope, TestOutputMessageEnvelope>)
-            (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), 
m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+      new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, 
TestOutputMessageEnvelope>() {
+        @Override
+        public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, 
TestMessageEnvelope m2) {
+          return new TestOutputMessageEnvelope(m1.getKey(), 
m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+        }
+
+        @Override
+        public String getFirstKey(TestMessageEnvelope message) {
+          return message.getKey();
+        }
+
+        @Override
+        public String getSecondKey(TestMessageEnvelope message) {
+          return message.getKey();
+        }
+      };
 
     MessageStream<TestOutputMessageEnvelope> joinOutput = 
source1.join(source2, joiner);
     Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();

http://git-wip-us.apache.org/repos/asf/samza/blob/001be632/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index ec63d41..02637a3 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.TestMessageStreamImplUtil;
 import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.KeyValueJoinFunction;
+import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -192,9 +192,22 @@ public class TestOperatorImpls {
     Config mockConfig = mock(Config.class);
     input1
         .join(input2,
-            (KeyValueJoinFunction<String, TestMessageEnvelope, 
TestMessageEnvelope, TestOutputMessageEnvelope>) (m1, m2) ->
-                new TestOutputMessageEnvelope(m1.getKey(), 
m1.getMessage().getValue().length() + m2.getMessage().getValue().length())
-            )
+            new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, 
TestOutputMessageEnvelope>() {
+              @Override
+              public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, 
TestMessageEnvelope m2) {
+                return new TestOutputMessageEnvelope(m1.getKey(), 
m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+              }
+
+              @Override
+              public String getFirstKey(TestMessageEnvelope message) {
+                return message.getKey();
+              }
+
+              @Override
+              public String getSecondKey(TestMessageEnvelope message) {
+                return message.getKey();
+              }
+            })
         .map(m -> m);
     OperatorGraph opGraph = new OperatorGraph();
     // now, we create chained operators from each input sources

Reply via email to