This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new affded2  SAMZA-2294: LocalApplicationRunner support for missing 
coordinator stream (#1130)
affded2 is described below

commit affded20e21cdf2694cab1cb40948209c94a4aaa
Author: Daniel Nishimura <[email protected]>
AuthorDate: Wed Aug 7 11:05:47 2019 -0700

    SAMZA-2294: LocalApplicationRunner support for missing coordinator stream 
(#1130)
---
 .../samza/runtime/LocalApplicationRunner.java      | 30 ++++++++---
 .../samza/runtime/TestLocalApplicationRunner.java  | 59 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 8 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 5549570..9337d9b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -83,7 +83,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
   private final AtomicReference<Throwable> failure = new AtomicReference<>();
   private final boolean isAppModeBatch;
   private final Optional<CoordinationUtils> coordinationUtils;
-  private final MetadataStoreFactory metadataStoreFactory;
+  private final Optional<MetadataStoreFactory> metadataStoreFactory;
   private Optional<String> runId = Optional.empty();
   private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
 
@@ -96,7 +96,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
    * @param config configuration for the application
    */
   public LocalApplicationRunner(SamzaApplication app, Config config) {
-    this(app, config, new CoordinatorStreamMetadataStoreFactory());
+    this(app, config, getMetadataStoreFactory(new JobConfig(config)));
   }
 
   /**
@@ -110,7 +110,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
     this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
     this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
     this.coordinationUtils = getCoordinationUtils(config, 
getClass().getClassLoader());
-    this.metadataStoreFactory = metadataStoreFactory;
+    this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
   }
 
   /**
@@ -121,7 +121,16 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
     this.appDesc = appDesc;
     this.isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
     this.coordinationUtils = coordinationUtils;
-    this.metadataStoreFactory = new CoordinatorStreamMetadataStoreFactory();
+    this.metadataStoreFactory = 
Optional.ofNullable(getMetadataStoreFactory(new 
JobConfig(appDesc.getConfig())));
+  }
+
+  static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
+    if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
+      return new CoordinatorStreamMetadataStoreFactory();
+    }
+    LOG.warn("{} or {} not configured. No coordinator stream metadata store 
will be created.",
+        JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+    return null;
   }
 
   private Optional<CoordinationUtils> getCoordinationUtils(Config config, 
ClassLoader classLoader) {
@@ -189,7 +198,9 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
       jobConfigs.forEach(jobConfig -> {
           LOG.debug("Starting job {} StreamProcessor with config {}", 
jobConfig.getName(), jobConfig);
           MetadataStore coordinatorStreamStore = 
createCoordinatorStreamStore(jobConfig);
-          coordinatorStreamStore.init();
+          if (coordinatorStreamStore != null) {
+            coordinatorStreamStore.init();
+          }
           StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
               sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), 
Optional.ofNullable(externalContext), coordinatorStreamStore);
           processors.add(Pair.of(processor, coordinatorStreamStore));
@@ -262,9 +273,12 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
 
   @VisibleForTesting
   MetadataStore createCoordinatorStreamStore(Config jobConfig) {
-    MetadataStore coordinatorStreamStore =
-        metadataStoreFactory.getMetadataStore("NoOp", jobConfig, new 
MetricsRegistryMap());
-    return coordinatorStreamStore;
+    if (metadataStoreFactory.isPresent()) {
+      MetadataStore coordinatorStreamStore =
+          metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new 
MetricsRegistryMap());
+      return coordinatorStreamStore;
+    }
+    return null;
   }
 
   @VisibleForTesting
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 9e63cc4..4acc8c3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -44,6 +44,7 @@ import org.apache.samza.coordinator.DistributedLock;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.task.IdentityStreamTask;
 import org.apache.samza.zk.ZkMetadataStore;
@@ -60,6 +61,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
@@ -211,6 +213,43 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
+  public void testRunCompleteWithouCoordinatorStreamStore() throws Exception {
+    Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
+    config = new MapConfig(cfgs);
+    ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> 
mock(ProcessorLifecycleListener.class);
+    mockApp = (StreamApplication) appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    prepareTest();
+
+    // return the jobConfigs from the planner
+    doReturn(Collections.singletonList(new JobConfig(new 
MapConfig(config)))).when(localPlanner).prepareJobs();
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> 
captor =
+        
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = 
captor.getValue().createInstance(sp);
+        listener.afterStart();
+        listener.afterStop();
+        return null;
+      }).when(sp).start();
+
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), 
eq(Optional.of(externalContext)), eq(null));
+    
doReturn(null).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+    runner.run(externalContext);
+    runner.waitForFinish();
+
+    assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+  }
+
+  @Test
   public void testRunFailure() throws Exception {
     Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.PROCESSOR_ID, "0");
@@ -437,4 +476,24 @@ public class TestLocalApplicationRunner {
     
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
   }
 
+  @Test
+  public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
+    MetadataStoreFactory metadataStoreFactory =
+        LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new 
MapConfig()));
+    assertNull(metadataStoreFactory);
+  }
+
+  @Test
+  public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
+    MetadataStoreFactory metadataStoreFactory =
+        LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
+    assertNotNull(metadataStoreFactory);
+  }
+
+  @Test
+  public void testGetMetadataStoreFactoryWithDefaultSystem() {
+    MetadataStoreFactory metadataStoreFactory =
+        LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
+    assertNotNull(metadataStoreFactory);
+  }
 }

Reply via email to