SAMZA-1890: Fix the creation of MapFunction in TestExecutionPlanner. dnishimura Kindly take a look.
Author: Pawas Chhokra <[email protected]> Reviewers: Sanil Jain <[email protected]>, Daniel Nishimura <[email protected]> Closes #648 from PawasChhokra/TestExecutionPlanner Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c48bcd2e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c48bcd2e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c48bcd2e Branch: refs/heads/NewKafkaSystemConsumer Commit: c48bcd2e1d6892b236b232d8dc569d2388751a4c Parents: 1755268 Author: Pawas Chhokra <[email protected]> Authored: Wed Sep 19 16:34:40 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Sep 19 16:34:40 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/execution/TestExecutionPlanner.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c48bcd2e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index c089225..ad6b386 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -178,12 +178,12 @@ public class TestExecutionPlanner { OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor); messageStream1.map(m -> m) - .filter(m -> true) - .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1"); + .filter(m -> true) + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), (Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) mock(Serde.class)), "w1"); messageStream2.map(m -> m) - .filter(m -> true) - .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2"); + .filter(m -> true) + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), (Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) mock(Serde.class)), "w2"); messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1").sendTo(output1);
