SAMZA-1890: Fix the creation of MapFunction in TestExecutionPlanner.

dnishimura Kindly take a look.

Author: Pawas Chhokra <[email protected]>

Reviewers: Sanil Jain <[email protected]>, Daniel Nishimura 
<[email protected]>

Closes #648 from PawasChhokra/TestExecutionPlanner


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c48bcd2e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c48bcd2e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c48bcd2e

Branch: refs/heads/NewKafkaSystemConsumer
Commit: c48bcd2e1d6892b236b232d8dc569d2388751a4c
Parents: 1755268
Author: Pawas Chhokra <[email protected]>
Authored: Wed Sep 19 16:34:40 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Sep 19 16:34:40 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/execution/TestExecutionPlanner.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c48bcd2e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index c089225..ad6b386 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -178,12 +178,12 @@ public class TestExecutionPlanner {
         OutputStream<KV<Object, Object>> output2 = 
appDesc.getOutputStream(output2Descriptor);
 
         messageStream1.map(m -> m)
-          .filter(m -> true)
-          .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), 
mock(Serde.class), mock(Serde.class)), "w1");
+            .filter(m -> true)
+            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), 
(Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) 
mock(Serde.class)), "w1");
 
         messageStream2.map(m -> m)
-          .filter(m -> true)
-          .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), 
mock(Serde.class), mock(Serde.class)), "w2");
+            .filter(m -> true)
+            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), 
(Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) 
mock(Serde.class)), "w2");
 
         messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, 
Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
           mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(1600), "j1").sendTo(output1);

Reply via email to