This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b5a32b1 SAMZA-2298: Fix CoordinatorStreamStore creation for 
LocalApplicationRunner (#1136)
e0b5a32b1 is described below

commit e0b5a32b1ca542f0409142c667bf0bcec68d5cb3
Author: Daniel Nishimura <[email protected]>
AuthorDate: Fri Aug 30 10:26:06 2019 -0700

    SAMZA-2298: Fix CoordinatorStreamStore creation for LocalApplicationRunner 
(#1136)
---
 .../samza/runtime/LocalApplicationRunner.java      | 66 +++++++++++++---
 .../samza/runtime/TestLocalApplicationRunner.java  | 90 ++++++++++++++++++++--
 2 files changed, 140 insertions(+), 16 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 9337d9b..cdf1a33 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -57,10 +57,15 @@ import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkMetadataStoreFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,7 +101,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
    * @param config configuration for the application
    */
   public LocalApplicationRunner(SamzaApplication app, Config config) {
-    this(app, config, getMetadataStoreFactory(new JobConfig(config)));
+    this(app, config, getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(config)));
   }
 
   /**
@@ -121,15 +126,22 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
     this.appDesc = appDesc;
     this.isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
     this.coordinationUtils = coordinationUtils;
-    this.metadataStoreFactory = 
Optional.ofNullable(getMetadataStoreFactory(new 
JobConfig(appDesc.getConfig())));
+    this.metadataStoreFactory = 
Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(appDesc.getConfig())));
   }
 
-  static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
-    if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
+  static MetadataStoreFactory 
getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
+    String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
+    JobCoordinatorConfig jobCoordinatorConfig = new 
JobCoordinatorConfig(jobConfig);
+    String jobCoordinatorFactoryClassName = 
jobCoordinatorConfig.getJobCoordinatorFactoryClassName();
+
+    // TODO: Remove restriction to only ZkJobCoordinator after next phase of 
metadata store abstraction.
+    if (StringUtils.isNotBlank(coordinatorSystemName) && 
ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) 
{
       return new CoordinatorStreamMetadataStoreFactory();
     }
-    LOG.warn("{} or {} not configured. No coordinator stream metadata store 
will be created.",
-        JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+
+    LOG.warn("{} or {} not configured, or {} is not {}. No default coordinator 
stream metadata store will be created.",
+        JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM,
+        JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
ZkJobCoordinatorFactory.class.getName());
     return null;
   }
 
@@ -272,16 +284,50 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
   }
 
   @VisibleForTesting
-  MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+  MetadataStore createCoordinatorStreamStore(Config config) {
     if (metadataStoreFactory.isPresent()) {
-      MetadataStore coordinatorStreamStore =
-          metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new 
MetricsRegistryMap());
-      return coordinatorStreamStore;
+      // TODO: Add missing metadata store abstraction for creating the 
underlying store to address SAMZA-2182
+      if (metadataStoreFactory.get() instanceof 
CoordinatorStreamMetadataStoreFactory) {
+        if (createUnderlyingCoordinatorStream(config)) {
+          MetadataStore coordinatorStreamStore =
+              metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+          LOG.info("Created coordinator stream store of type: {}", 
coordinatorStreamStore.getClass().getSimpleName());
+          return coordinatorStreamStore;
+        }
+      } else {
+        MetadataStore otherMetadataStore =
+            metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+        LOG.info("Created alternative coordinator stream store of type: {}", 
otherMetadataStore.getClass().getSimpleName());
+        return otherMetadataStore;
+      }
     }
+
+    LOG.warn("No coordinator stream store created.");
     return null;
   }
 
   @VisibleForTesting
+  boolean createUnderlyingCoordinatorStream(Config config) {
+    // TODO: This work around method is necessary due to SAMZA-2182 - Metadata 
store: disconnect between creation and usage of the underlying storage
+    //  and will be addressed in the next phase of metadata store abstraction
+    if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
+      LOG.warn("{} or {} not configured. Coordinator stream not created.",
+          JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+      return false;
+    }
+    SystemStream coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    systemAdmins.start();
+    try {
+      SystemAdmin coordinatorSystemAdmin = 
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
+      CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, 
coordinatorSystemAdmin);
+    } finally {
+      systemAdmins.stop();
+    }
+    return true;
+  }
+
+  @VisibleForTesting
   StreamProcessor createStreamProcessor(Config config, 
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,
       Optional<ExternalContext> externalContextOptional, MetadataStore 
coordinatorStreamStore) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 4acc8c3..71100c4 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -35,17 +35,25 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.coordinator.ClusterMembership;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLock;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.InMemoryMetadataStore;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.task.IdentityStreamTask;
 import org.apache.samza.zk.ZkMetadataStore;
 import org.apache.samza.zk.ZkMetadataStoreFactory;
@@ -476,24 +484,94 @@ public class TestLocalApplicationRunner {
     
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
   }
 
+  /**
+   * Default metadata store factory should be null if no job coordinator 
system defined and the default
+   * ZkJobCoordinator is used.
+   */
   @Test
-  public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
+  public void 
testGetCoordinatorStreamStoreFactoryWithoutJobCoordinatorSystem() {
     MetadataStoreFactory metadataStoreFactory =
-        LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new 
MapConfig()));
+        LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(new MapConfig()));
     assertNull(metadataStoreFactory);
   }
 
+  /**
+   * Default metadata store factory should not be null if job coordinator 
system defined and the default
+   * ZkJobCoordinator is used.
+   */
   @Test
-  public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
+  public void testGetCoordinatorStreamStoreFactoryWithJobCoordinatorSystem() {
     MetadataStoreFactory metadataStoreFactory =
-        LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
+        LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, 
"test-system"))));
     assertNotNull(metadataStoreFactory);
   }
 
+  /**
+   * Default metadata store factory should not be null if default system 
defined and the default
+   * ZkJobCoordinator is used.
+   */
   @Test
-  public void testGetMetadataStoreFactoryWithDefaultSystem() {
+  public void testGetCoordinatorStreamStoreFactoryWithDefaultSystem() {
     MetadataStoreFactory metadataStoreFactory =
-        LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
+        LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, 
"test-system"))));
     assertNotNull(metadataStoreFactory);
   }
+
+  /**
+   * Default metadata store factory be null if job coordinator system or 
default system defined and a non ZkJobCoordinator
+   * job coordinator is used.
+   */
+  @Test
+  public void testGetCoordinatorStreamStoreFactoryWithNonZkJobCoordinator() {
+    MapConfig mapConfig = new MapConfig(
+        ImmutableMap.of(
+            JobConfig.JOB_DEFAULT_SYSTEM, "test-system",
+            JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName()));
+    MetadataStoreFactory metadataStoreFactory =
+        LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(mapConfig));
+    assertNull(metadataStoreFactory);
+  }
+
+  /**
+   * Underlying coordinator stream should be created if using 
CoordinatorStreamMetadataStoreFactory
+   * @throws Exception
+   */
+  @Test
+  public void testCreateCoordinatorStreamWithCoordinatorFactory() throws 
Exception {
+    CoordinatorStreamStore coordinatorStreamStore = 
mock(CoordinatorStreamStore.class);
+    CoordinatorStreamMetadataStoreFactory 
coordinatorStreamMetadataStoreFactory = 
mock(CoordinatorStreamMetadataStoreFactory.class);
+    
doReturn(coordinatorStreamStore).when(coordinatorStreamMetadataStoreFactory).getMetadataStore(anyString(),
 any(Config.class), any(
+        MetricsRegistry.class));
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
+    LocalApplicationRunner localApplicationRunner =
+        spy(new LocalApplicationRunner(mockApp, config, 
coordinatorStreamMetadataStoreFactory));
+
+    // create store only if successful in creating the underlying coordinator 
stream
+    
doReturn(true).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+    assertEquals(coordinatorStreamStore, 
localApplicationRunner.createCoordinatorStreamStore(config));
+    
verify(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+
+    // do not create store if creating the underlying coordinator stream fails
+    
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+    assertNull(localApplicationRunner.createCoordinatorStreamStore(config));
+  }
+
+  /**
+   * Underlying coordinator stream should not be created if not using 
CoordinatorStreamMetadataStoreFactory
+   * @throws Exception
+   */
+  @Test
+  public void testCreateCoordinatorStreamWithoutCoordinatorFactory() throws 
Exception {
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
+    LocalApplicationRunner localApplicationRunner =
+        spy(new LocalApplicationRunner(mockApp, config, new 
InMemoryMetadataStoreFactory()));
+    
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+    MetadataStore coordinatorStreamStore = 
localApplicationRunner.createCoordinatorStreamStore(config);
+    assertTrue(coordinatorStreamStore instanceof InMemoryMetadataStore);
+
+    // creating underlying coordinator stream should not be called for other 
coordinator stream metadata store types.
+    verify(localApplicationRunner, 
never()).createUnderlyingCoordinatorStream(eq(config));
+  }
 }

Reply via email to