This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6201be1  SAMZA-2372: Null pointer exception in LocalApplicationRunner 
(#1210)
6201be1 is described below

commit 6201be1af32955c84a2a1cb2aabdb4a48f34606f
Author: IgorDurovic <[email protected]>
AuthorDate: Wed Nov 6 09:03:10 2019 -0800

    SAMZA-2372: Null pointer exception in LocalApplicationRunner (#1210)
---
 .../samza/runtime/LocalApplicationRunner.java      |  6 ++-
 .../samza/runtime/TestLocalApplicationRunner.java  | 43 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 6859c07..44adfde 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -234,7 +234,11 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
   public void kill() {
     processors.forEach(sp -> {
         sp.getLeft().stop();    // Stop StreamProcessor
-        sp.getRight().close();  // Close associated coordinator metadata store
+
+        // Coordinator stream isn't required so a null check is necessary
+        if (sp.getRight() != null) {
+          sp.getRight().close();  // Close associated coordinator metadata 
store
+        }
       });
     cleanup();
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 02ecbd6..6c7fcb4 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -348,6 +348,49 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
+  public void testKillWithoutCoordinatorStream() throws Exception {
+    Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
+    config = new MapConfig(cfgs);
+    ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> 
mock(ProcessorLifecycleListener.class);
+    mockApp = (StreamApplication) appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    prepareTest();
+
+    // return the jobConfigs from the planner
+    doReturn(Collections.singletonList(new JobConfig(new 
MapConfig(config)))).when(localPlanner).prepareJobs();
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> 
captor =
+        
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = 
captor.getValue().createInstance(sp);
+        listener.afterStart();
+        return null;
+      }).when(sp).start();
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = 
captor.getValue().createInstance(sp);
+        listener.afterStop();
+        return null;
+      }).when(sp).stop();
+
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), 
eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+    
doReturn(null).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+    runner.run(externalContext);
+    runner.kill();
+
+    assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+  }
+
+  @Test
   public void testWaitForFinishReturnsBeforeTimeout() {
     long timeoutInMs = 1000;
 

Reply via email to