dnishimura commented on a change in pull request #1136: SAMZA-2298: Fix 
CoordinatorStreamStore creation for LocalApplicationRunner
URL: https://github.com/apache/samza/pull/1136#discussion_r317417956
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##########
 @@ -272,15 +284,49 @@ CountDownLatch getShutdownLatch() {
   }
 
   @VisibleForTesting
-  MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+  MetadataStore createCoordinatorStreamStore(Config config) {
     if (metadataStoreFactory.isPresent()) {
-      MetadataStore coordinatorStreamStore =
-          metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new 
MetricsRegistryMap());
-      return coordinatorStreamStore;
+      // TODO: Add missing metadata store abstraction for creating the 
underlying store to address SAMZA-2182
+      if (metadataStoreFactory.get() instanceof 
CoordinatorStreamMetadataStoreFactory) {
+        if (createUnderlyingCoordinatorStream(config)) {
+          MetadataStore coordinatorStreamStore =
+              metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+          LOG.info("Created coordinator stream store of type: {}", 
coordinatorStreamStore.getClass().getSimpleName());
+          return coordinatorStreamStore;
+        }
+      } else {
+        MetadataStore otherMetadataStore =
+            metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+        LOG.info("Created alternative coordinator stream store of type: {}", 
otherMetadataStore.getClass().getSimpleName());
+        return otherMetadataStore;
+      }
     }
+
+    LOG.warn("No coordinator stream store created.");
     return null;
   }
 
+  @VisibleForTesting
+  boolean createUnderlyingCoordinatorStream(Config config) {
+    // TODO: This work around method is necessary due to SAMZA-2182 - Metadata 
store: disconnect between creation and usage of the underlying storage
+    //  and will be addressed in the next phase of metadata store abstraction
+    if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
+      LOG.warn("{} or {} not configured. Coordinator stream not created.",
+          JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+      return false;
+    }
+    SystemStream coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
 
 Review comment:
   If you look a few lines up, there's a ticket in the comment that expresses 
the same concern: `SAMZA-2182 - Metadata store: disconnect between creation and 
usage of the underlying storage`. This is a general issue with coordinator 
stream metadata store and will need to be eventually addressed (but not in this 
PR though :) )

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to