Repository: samza Updated Branches: refs/heads/master 2f1003bc1 -> e312bb552
Fix potential null pointer issues in StreamOperatorTask Author: bharathkk <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Shanthoosh Venkatraman <[email protected]> Closes #699 from bharathkk/bug-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e312bb55 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e312bb55 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e312bb55 Branch: refs/heads/master Commit: e312bb552640c4a97b0a99f05586fe6224120537 Parents: 2f1003b Author: bharathkk <[email protected]> Authored: Mon Oct 8 16:41:48 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Mon Oct 8 16:41:48 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/task/StreamOperatorTask.java | 4 +++- .../samza/task/TestStreamOperatorTask.java | 25 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e312bb55/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index 2ca4e81..aa896c2 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -136,7 +136,9 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT if (this.contextManager != null) { this.contextManager.close(); } - operatorImplGraph.close(); + if (operatorImplGraph != null) { + operatorImplGraph.close(); + } } /* package private for testing */ http://git-wip-us.apache.org/repos/asf/samza/blob/e312bb55/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java index 45b08d7..1bc23d4 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java @@ -19,7 +19,14 @@ package org.apache.samza.task; +import org.apache.samza.config.Config; +import org.apache.samza.operators.ContextManager; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.impl.OperatorImplGraph; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public class TestStreamOperatorTask { @@ -27,4 +34,22 @@ public class TestStreamOperatorTask { public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask task) { return task.getOperatorImplGraph(); } + + @Test + public void testCloseDuringInitializationErrors() { + ContextManager mockContextManager = mock(ContextManager.class); + StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mockContextManager); + + doThrow(new RuntimeException("Failed to initialize context manager")) + .when(mockContextManager).init(any(), any()); + + try { + operatorTask.init(mock(Config.class), mock(TaskContext.class)); + operatorTask.close(); + } catch (Exception e) { + if (e instanceof NullPointerException) { + fail("Unexpected null pointer exception"); + } + } + } }
