This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4b012d9 SAMZA-2296: Metadata streams not created in Standalone (#1132)
4b012d9 is described below
commit 4b012d90d482fc906d4737bb693f806e2ff32a3e
Author: Daniel Nishimura <[email protected]>
AuthorDate: Fri Aug 9 12:05:15 2019 -0700
SAMZA-2296: Metadata streams not created in Standalone (#1132)
* SAMZA-2296: Metadata streams not created in Standalone
* Trigger
---
.../clustermanager/ClusterBasedJobCoordinator.java | 2 +-
.../samza/coordinator/MetadataResourceUtil.java | 21 ++++++-----
.../standalone/PassthroughJobCoordinator.java | 2 +-
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 6 +--
.../apache/samza/job/local/ProcessJobFactory.scala | 2 +-
.../apache/samza/job/local/ThreadJobFactory.scala | 2 +-
.../coordinator/TestMetadataResourceUtil.java | 44 ++++++++++++++++++----
.../org/apache/samza/zk/TestZkJobCoordinator.java | 2 +-
8 files changed, 57 insertions(+), 24 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index ef46205..b22bf23 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -234,7 +234,7 @@ public class ClusterBasedJobCoordinator {
//create necessary checkpoint and changelog streams, if not created
JobModel jobModel = jobModelManager.jobModel();
MetadataResourceUtil metadataResourceUtil =
- new MetadataResourceUtil(jobModel, this.metrics,
getClass().getClassLoader());
+ new MetadataResourceUtil(jobModel, this.metrics,
getClass().getClassLoader(), config);
metadataResourceUtil.createResources();
// fan out the startpoints
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 148e942..ee59ca0 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator;
import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
@@ -33,24 +34,20 @@ import org.apache.samza.storage.ChangelogStreamManager;
// TODO: Replace with a metadata admin interface when the {@link
MetadataStore} is fully augmented to handle all metadata sources.
public class MetadataResourceUtil {
private final CheckpointManager checkpointManager;
+ private final Config config;
private final JobModel jobModel; // TODO: Should be loaded by metadata store
in the future
/**
* @param jobModel the loaded {@link JobModel}
* @param metricsRegistry the registry for reporting metrics.
*/
- public MetadataResourceUtil(JobModel jobModel, MetricsRegistry
metricsRegistry, ClassLoader classLoader) {
+ public MetadataResourceUtil(JobModel jobModel, MetricsRegistry
metricsRegistry, ClassLoader classLoader, Config config) {
+ this.config = config;
this.jobModel = jobModel;
- TaskConfig taskConfig = new TaskConfig(jobModel.getConfig());
+ TaskConfig taskConfig = new TaskConfig(config);
this.checkpointManager = taskConfig.getCheckpointManager(metricsRegistry,
classLoader).orElse(null);
}
- @VisibleForTesting
- MetadataResourceUtil(CheckpointManager checkpointManager, JobModel jobModel)
{
- this.jobModel = jobModel;
- this.checkpointManager = checkpointManager;
- }
-
/**
* Creates and loads the required metadata resources for checkpoints,
changelog stream and other
* resources related to the metadata system
@@ -62,7 +59,13 @@ public class MetadataResourceUtil {
createChangelogStreams();
}
+ @VisibleForTesting
void createChangelogStreams() {
- ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(),
jobModel.maxChangeLogStreamPartitions);
+ ChangelogStreamManager.createChangelogStreams(config,
jobModel.maxChangeLogStreamPartitions);
+ }
+
+ @VisibleForTesting
+ CheckpointManager getCheckpointManager() {
+ return checkpointManager;
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index b7d43f6..716b367 100644
---
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -86,7 +86,7 @@ public class PassthroughJobCoordinator implements
JobCoordinator {
try {
jobModel = getJobModel();
// TODO metrics registry has been null here for a while; is it safe?
- MetadataResourceUtil metadataResourceUtil = new
MetadataResourceUtil(jobModel, null, getClass().getClassLoader());
+ MetadataResourceUtil metadataResourceUtil = new
MetadataResourceUtil(jobModel, null, getClass().getClassLoader(), config);
metadataResourceUtil.createResources();
} catch (Exception e) {
LOGGER.error("Exception while trying to getJobModel.", e);
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index b4a2481..80bf92e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -313,7 +313,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@VisibleForTesting
void loadMetadataResources(JobModel jobModel) {
try {
- MetadataResourceUtil metadataResourceUtil =
createMetadataResourceUtil(jobModel, getClass().getClassLoader());
+ MetadataResourceUtil metadataResourceUtil =
createMetadataResourceUtil(jobModel, getClass().getClassLoader(), config);
metadataResourceUtil.createResources();
if (coordinatorStreamStore != null) {
@@ -343,8 +343,8 @@ public class ZkJobCoordinator implements JobCoordinator {
}
@VisibleForTesting
- MetadataResourceUtil createMetadataResourceUtil(JobModel jobModel,
ClassLoader classLoader) {
- return new MetadataResourceUtil(jobModel, metrics.getMetricsRegistry(),
classLoader);
+ MetadataResourceUtil createMetadataResourceUtil(JobModel jobModel,
ClassLoader classLoader, Config config) {
+ return new MetadataResourceUtil(jobModel, metrics.getMetricsRegistry(),
classLoader, config);
}
/**
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index b1cfbe6..746c234 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -70,7 +70,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging
{
changelogStreamManager.writePartitionMapping(taskPartitionMappings)
//create necessary checkpoint and changelog streams
- val metadataResourceUtil = new MetadataResourceUtil(jobModel,
metricsRegistry, classLoader)
+ val metadataResourceUtil = new MetadataResourceUtil(jobModel,
metricsRegistry, classLoader, config)
metadataResourceUtil.createResources()
// fan out the startpoints
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index f9116fe..fc8296b 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -71,7 +71,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
changelogStreamManager.writePartitionMapping(taskPartitionMappings)
//create necessary checkpoint and changelog streams
- val metadataResourceUtil = new MetadataResourceUtil(jobModel,
metricsRegistry, classLoader)
+ val metadataResourceUtil = new MetadataResourceUtil(jobModel,
metricsRegistry, classLoader, config)
metadataResourceUtil.createResources()
// fan out the startpoints
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
b/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
index dc86704..7ff85e1 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/TestMetadataResourceUtil.java
@@ -19,30 +19,60 @@
package org.apache.samza.coordinator;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestMetadataResourceUtil {
- private CheckpointManager checkpointManager;
- private JobModel jobModel;
+ private JobModel mockJobModel;
+ private MetricsRegistry mockMetricsRegistry;
@Before
public void setUp() {
- checkpointManager = Mockito.mock(CheckpointManager.class);
- jobModel = Mockito.mock(JobModel.class);
+ mockJobModel = Mockito.mock(JobModel.class);
+ mockMetricsRegistry = Mockito.mock(MetricsRegistry.class);
}
@Test
- public void testLoad() {
- MetadataResourceUtil metadataResourceUtil = Mockito.spy(new
MetadataResourceUtil(checkpointManager, jobModel));
+ public void testLoadWithCheckpointConfigured() {
+ MapConfig mapConfig = new
MapConfig(ImmutableMap.of(TaskConfig.CHECKPOINT_MANAGER_FACTORY,
+ TestCheckpointManagerFactory.class.getName()));
+ MetadataResourceUtil metadataResourceUtil = Mockito.spy(new
MetadataResourceUtil(mockJobModel, mockMetricsRegistry,
getClass().getClassLoader(), mapConfig));
Mockito.doNothing().when(metadataResourceUtil).createChangelogStreams();
metadataResourceUtil.createResources();
- Mockito.verify(checkpointManager).createResources();
+ Mockito.verify(mockJobModel, Mockito.never()).getConfig(); // Never get
config from job model. In standalone, this is empty.
+
Mockito.verify(metadataResourceUtil.getCheckpointManager()).createResources();
Mockito.verify(metadataResourceUtil).createChangelogStreams();
}
+
+
+ @Test
+ public void testLoadWithoutCheckpointConfigured() {
+ MapConfig mapConfig = new MapConfig();
+ MetadataResourceUtil metadataResourceUtil = Mockito.spy(new
MetadataResourceUtil(mockJobModel, mockMetricsRegistry,
getClass().getClassLoader(), mapConfig));
+ Mockito.doNothing().when(metadataResourceUtil).createChangelogStreams();
+
+ metadataResourceUtil.createResources();
+ Mockito.verify(mockJobModel, Mockito.never()).getConfig(); // Never get
config from job model. In standalone, this is empty.
+ Assert.assertNull(metadataResourceUtil.getCheckpointManager());
+ Mockito.verify(metadataResourceUtil).createChangelogStreams();
+ }
+
+ public static class TestCheckpointManagerFactory implements
CheckpointManagerFactory {
+ @Override
+ public CheckpointManager getCheckpointManager(Config config,
MetricsRegistry registry) {
+ return Mockito.mock(CheckpointManager.class);
+ }
+ }
}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 1b3ee12..3492035 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -272,7 +272,7 @@ public class TestZkJobCoordinator {
MetadataResourceUtil mockMetadataResourceUtil =
mock(MetadataResourceUtil.class);
doReturn(mockMetadataResourceUtil).when(zkJobCoordinator)
- .createMetadataResourceUtil(any(), eq(getClass().getClassLoader()));
+ .createMetadataResourceUtil(any(), eq(getClass().getClassLoader()),
any(Config.class));
verifyZeroInteractions(mockStartpointManager);