Repository: samza Updated Branches: refs/heads/master 92b67b7a3 -> b31c0dc6e
SAMZA-1288: Add null check for sink OutputStream The logic to generate json for Sink operator does not check whether the output stream is null. This causes null pointer exception. Author: Xinyu Liu <[email protected]> Reviewers: Jake Maes <[email protected]> Closes #188 from xinyuiscool/SAMZA-1288 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b31c0dc6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b31c0dc6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b31c0dc6 Branch: refs/heads/master Commit: b31c0dc6efc43ea6c80c872885283804d7f2188a Parents: 92b67b7 Author: Xinyu Liu <[email protected]> Authored: Fri May 12 09:49:04 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Fri May 12 09:49:04 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/operators/util/OperatorJsonUtils.java | 10 +++++----- .../apache/samza/execution/TestJobGraphJsonGenerator.java | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b31c0dc6/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java index b52fbc3..b971607 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java +++ b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java @@ -27,12 +27,9 @@ import java.util.stream.Collectors; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.samza.operators.stream.OutputStreamInternal; public class OperatorJsonUtils { - private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class); - private static final String OP_CODE = "opCode"; private static final String OP_ID = "opId"; private static final String SOURCE_LOCATION = "sourceLocation"; @@ -59,7 +56,10 @@ public class OperatorJsonUtils { } if (spec instanceof SinkOperatorSpec) { - map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId()); + OutputStreamInternal outputStream = ((SinkOperatorSpec) spec).getOutputStream(); + if (outputStream != null) { + map.put(OUTPUT_STREAM_ID, outputStream.getStreamSpec().getId()); + } } if (spec instanceof PartialJoinOperatorSpec) { http://git-wip-us.apache.org/repos/asf/samza/blob/b31c0dc6/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index 2681f9c..e53cd42 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -113,6 +113,7 @@ public class TestJobGraphJsonGenerator { OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn); m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1); + m2.sink((message, collector, coordinator) -> { }); m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); @@ -124,7 +125,7 @@ public class TestJobGraphJsonGenerator { ObjectMapper mapper = new ObjectMapper(); JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class); assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5); - assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12); + assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 13); assertTrue(nodes.sourceStreams.size() == 3); assertTrue(nodes.sinkStreams.size() == 2); assertTrue(nodes.intermediateStreams.size() == 2);
