This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b012d9  SAMZA-2296: Metadata streams not created in Standalone (#1132)
4b012d9 is described below

commit 4b012d90d482fc906d4737bb693f806e2ff32a3e
Author: Daniel Nishimura <[email protected]>
AuthorDate: Fri Aug 9 12:05:15 2019 -0700

    SAMZA-2296: Metadata streams not created in Standalone (#1132)
    
    * SAMZA-2296: Metadata streams not created in Standalone
    
    * Trigger
---
 .../clustermanager/ClusterBasedJobCoordinator.java |  2 +-
 .../samza/coordinator/MetadataResourceUtil.java    | 21 ++++++-----
 .../standalone/PassthroughJobCoordinator.java      |  2 +-
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  6 +--
 .../apache/samza/job/local/ProcessJobFactory.scala |  2 +-
 .../apache/samza/job/local/ThreadJobFactory.scala  |  2 +-
 .../coordinator/TestMetadataResourceUtil.java      | 44 ++++++++++++++++++----
 .../org/apache/samza/zk/TestZkJobCoordinator.java  |  2 +-
 8 files changed, 57 insertions(+), 24 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index ef46205..b22bf23 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -234,7 +234,7 @@ public class ClusterBasedJobCoordinator {
       //create necessary checkpoint and changelog streams, if not created
       JobModel jobModel = jobModelManager.jobModel();
       MetadataResourceUtil metadataResourceUtil =
-          new MetadataResourceUtil(jobModel, this.metrics, 
getClass().getClassLoader());
+          new MetadataResourceUtil(jobModel, this.metrics, 
getClass().getClassLoader(), config);
       metadataResourceUtil.createResources();
 
       // fan out the startpoints
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 148e942..ee59ca0 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -33,24 +34,20 @@ import org.apache.samza.storage.ChangelogStreamManager;
 // TODO: Replace with a metadata admin interface when the {@link 
MetadataStore} is fully augmented to handle all metadata sources.
 public class MetadataResourceUtil {
   private final CheckpointManager checkpointManager;
+  private final Config config;
   private final JobModel jobModel; // TODO: Should be loaded by metadata store 
in the future
 
   /**
    * @param jobModel the loaded {@link JobModel}
    * @param metricsRegistry the registry for reporting metrics.
    */
-  public MetadataResourceUtil(JobModel jobModel, MetricsRegistry 
metricsRegistry, ClassLoader classLoader) {
+  public MetadataResourceUtil(JobModel jobModel, MetricsRegistry 
metricsRegistry, ClassLoader classLoader, Config config) {
+    this.config = config;
     this.jobModel = jobModel;
-    TaskConfig taskConfig = new TaskConfig(jobModel.getConfig());
+    TaskConfig taskConfig = new TaskConfig(config);
     this.checkpointManager = taskConfig.getCheckpointManager(metricsRegistry, 
classLoader).orElse(null);
   }
 
-  @VisibleForTesting
-  MetadataResourceUtil(CheckpointManager checkpointManager, JobModel jobModel) 
{
-    this.jobModel = jobModel;
-    this.checkpointManager = checkpointManager;
-  }
-
   /**
    * Creates and loads the required metadata resources for checkpoints, 
changelog stream and other
    * resources related to the metadata system
@@ -62,7 +59,13 @@ public class MetadataResourceUtil {
     createChangelogStreams();
   }
 
+  @VisibleForTesting
   void createChangelogStreams() {
-    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), 
jobModel.maxChangeLogStreamPartitions);
+    ChangelogStreamManager.createChangelogStreams(config, 
jobModel.maxChangeLogStreamPartitions);
+  }
+
+  @VisibleForTesting
+  CheckpointManager getCheckpointManager() {
+    return checkpointManager;
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index b7d43f6..716b367 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -86,7 +86,7 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
     try {
       jobModel = getJobModel();
       // TODO metrics registry has been null here for a while; is it safe?
-      MetadataResourceUtil metadataResourceUtil = new 
MetadataResourceUtil(jobModel, null, getClass().getClassLoader());
+      MetadataResourceUtil metadataResourceUtil = new 
MetadataResourceUtil(jobModel, null, getClass().getClassLoader(), config);
       metadataResourceUtil.createResources();
     } catch (Exception e) {
       LOGGER.error("Exception while trying to getJobModel.", e);
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index b4a2481..80bf92e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -313,7 +313,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   void loadMetadataResources(JobModel jobModel) {
     try {
-      MetadataResourceUtil metadataResourceUtil = 
createMetadataResourceUtil(jobModel, getClass().getClassLoader());
+      MetadataResourceUtil metadataResourceUtil = 
createMetadataResourceUtil(jobModel, getClass().getClassLoader(), config);
       metadataResourceUtil.createResources();
 
       if (coordinatorStreamStore != null) {
@@ -343,8 +343,8 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   @VisibleForTesting
-  MetadataResourceUtil createMetadataResourceUtil(JobModel jobModel, 
ClassLoader classLoader) {
-    return new MetadataResourceUtil(jobModel, metrics.getMetricsRegistry(), 
classLoader);
+  MetadataResourceUtil createMetadataResourceUtil(JobModel jobModel, 
ClassLoader classLoader, Config config) {
+    return new MetadataResourceUtil(jobModel, metrics.getMetricsRegistry(), 
classLoader, config);
   }
 
   /**
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index b1cfbe6..746c234 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -70,7 +70,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging 
{
     changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
     //create necessary checkpoint and changelog streams
-    val metadataResourceUtil = new MetadataResourceUtil(jobModel, 
metricsRegistry, classLoader)
+    val metadataResourceUtil = new MetadataResourceUtil(jobModel, 
metricsRegistry, classLoader, config)
     metadataResourceUtil.createResources()
 
     // fan out the startpoints
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index f9116fe..fc8296b 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -71,7 +71,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
     //create necessary checkpoint and changelog streams
-    val metadataResourceUtil = new MetadataResourceUtil(jobModel, 
metricsRegistry, classLoader)
+    val metadataResourceUtil = new MetadataResourceUtil(jobModel, 
metricsRegistry, classLoader, config)
     metadataResourceUtil.createResources()
 
     // fan out the startpoints
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
index dc86704..7ff85e1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
@@ -19,30 +19,60 @@
 
 package org.apache.samza.coordinator;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 
 public class TestMetadataResourceUtil {
-  private CheckpointManager checkpointManager;
-  private JobModel jobModel;
+  private JobModel mockJobModel;
+  private MetricsRegistry mockMetricsRegistry;
 
   @Before
   public void setUp() {
-    checkpointManager = Mockito.mock(CheckpointManager.class);
-    jobModel = Mockito.mock(JobModel.class);
+    mockJobModel = Mockito.mock(JobModel.class);
+    mockMetricsRegistry = Mockito.mock(MetricsRegistry.class);
   }
 
   @Test
-  public void testLoad() {
-    MetadataResourceUtil metadataResourceUtil = Mockito.spy(new 
MetadataResourceUtil(checkpointManager, jobModel));
+  public void testLoadWithCheckpointConfigured() {
+    MapConfig mapConfig = new 
MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY,
+        TestCheckpointManagerFactory.class.getName()));
+    MetadataResourceUtil metadataResourceUtil = Mockito.spy(new 
MetadataResourceUtil(mockJobModel, mockMetricsRegistry, 
getClass().getClassLoader(), mapConfig));
     Mockito.doNothing().when(metadataResourceUtil).createChangelogStreams();
 
     metadataResourceUtil.createResources();
-    Mockito.verify(checkpointManager).createResources();
+    Mockito.verify(mockJobModel, Mockito.never()).getConfig(); // Never get 
config from job model. In standalone, this is empty.
+    
Mockito.verify(metadataResourceUtil.getCheckpointManager()).createResources();
     Mockito.verify(metadataResourceUtil).createChangelogStreams();
   }
+
+
+  @Test
+  public void testLoadWithoutCheckpointConfigured() {
+    MapConfig mapConfig = new MapConfig();
+    MetadataResourceUtil metadataResourceUtil = Mockito.spy(new 
MetadataResourceUtil(mockJobModel, mockMetricsRegistry, 
getClass().getClassLoader(), mapConfig));
+    Mockito.doNothing().when(metadataResourceUtil).createChangelogStreams();
+
+    metadataResourceUtil.createResources();
+    Mockito.verify(mockJobModel, Mockito.never()).getConfig(); // Never get 
config from job model. In standalone, this is empty.
+    Assert.assertNull(metadataResourceUtil.getCheckpointManager());
+    Mockito.verify(metadataResourceUtil).createChangelogStreams();
+  }
+
+  public static class TestCheckpointManagerFactory implements 
CheckpointManagerFactory {
+    @Override
+    public CheckpointManager getCheckpointManager(Config config, 
MetricsRegistry registry) {
+      return Mockito.mock(CheckpointManager.class);
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 1b3ee12..3492035 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -272,7 +272,7 @@ public class TestZkJobCoordinator {
 
     MetadataResourceUtil mockMetadataResourceUtil = 
mock(MetadataResourceUtil.class);
     doReturn(mockMetadataResourceUtil).when(zkJobCoordinator)
-        .createMetadataResourceUtil(any(), eq(getClass().getClassLoader()));
+        .createMetadataResourceUtil(any(), eq(getClass().getClassLoader()), 
any(Config.class));
 
     verifyZeroInteractions(mockStartpointManager);
 

Reply via email to