This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new affded2 SAMZA-2294: LocalApplicationRunner support for missing
coordinator stream (#1130)
affded2 is described below
commit affded20e21cdf2694cab1cb40948209c94a4aaa
Author: Daniel Nishimura <[email protected]>
AuthorDate: Wed Aug 7 11:05:47 2019 -0700
SAMZA-2294: LocalApplicationRunner support for missing coordinator stream
(#1130)
---
.../samza/runtime/LocalApplicationRunner.java | 30 ++++++++---
.../samza/runtime/TestLocalApplicationRunner.java | 59 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 8 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 5549570..9337d9b 100644
---
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -83,7 +83,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private final boolean isAppModeBatch;
private final Optional<CoordinationUtils> coordinationUtils;
- private final MetadataStoreFactory metadataStoreFactory;
+ private final Optional<MetadataStoreFactory> metadataStoreFactory;
private Optional<String> runId = Optional.empty();
private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
@@ -96,7 +96,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
* @param config configuration for the application
*/
public LocalApplicationRunner(SamzaApplication app, Config config) {
- this(app, config, new CoordinatorStreamMetadataStoreFactory());
+ this(app, config, getMetadataStoreFactory(new JobConfig(config)));
}
/**
@@ -110,7 +110,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
this.isAppModeBatch = new ApplicationConfig(config).getAppMode() ==
ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = getCoordinationUtils(config,
getClass().getClassLoader());
- this.metadataStoreFactory = metadataStoreFactory;
+ this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
}
/**
@@ -121,7 +121,16 @@ public class LocalApplicationRunner implements
ApplicationRunner {
this.appDesc = appDesc;
this.isAppModeBatch = new
ApplicationConfig(appDesc.getConfig()).getAppMode() ==
ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = coordinationUtils;
- this.metadataStoreFactory = new CoordinatorStreamMetadataStoreFactory();
+ this.metadataStoreFactory =
Optional.ofNullable(getMetadataStoreFactory(new
JobConfig(appDesc.getConfig())));
+ }
+
+ static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
+ if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
+ return new CoordinatorStreamMetadataStoreFactory();
+ }
+ LOG.warn("{} or {} not configured. No coordinator stream metadata store
will be created.",
+ JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+ return null;
}
private Optional<CoordinationUtils> getCoordinationUtils(Config config,
ClassLoader classLoader) {
@@ -189,7 +198,9 @@ public class LocalApplicationRunner implements
ApplicationRunner {
jobConfigs.forEach(jobConfig -> {
LOG.debug("Starting job {} StreamProcessor with config {}",
jobConfig.getName(), jobConfig);
MetadataStore coordinatorStreamStore =
createCoordinatorStreamStore(jobConfig);
- coordinatorStreamStore.init();
+ if (coordinatorStreamStore != null) {
+ coordinatorStreamStore.init();
+ }
StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig),
Optional.ofNullable(externalContext), coordinatorStreamStore);
processors.add(Pair.of(processor, coordinatorStreamStore));
@@ -262,9 +273,12 @@ public class LocalApplicationRunner implements
ApplicationRunner {
@VisibleForTesting
MetadataStore createCoordinatorStreamStore(Config jobConfig) {
- MetadataStore coordinatorStreamStore =
- metadataStoreFactory.getMetadataStore("NoOp", jobConfig, new
MetricsRegistryMap());
- return coordinatorStreamStore;
+ if (metadataStoreFactory.isPresent()) {
+ MetadataStore coordinatorStreamStore =
+ metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new
MetricsRegistryMap());
+ return coordinatorStreamStore;
+ }
+ return null;
}
@VisibleForTesting
diff --git
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 9e63cc4..4acc8c3 100644
---
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -44,6 +44,7 @@ import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.task.IdentityStreamTask;
import org.apache.samza.zk.ZkMetadataStore;
@@ -60,6 +61,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
@@ -211,6 +213,43 @@ public class TestLocalApplicationRunner {
}
@Test
+ public void testRunCompleteWithouCoordinatorStreamStore() throws Exception {
+ Map<String, String> cfgs = new HashMap<>();
+ cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS,
UUIDGenerator.class.getName());
+ config = new MapConfig(cfgs);
+ ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) ->
mock(ProcessorLifecycleListener.class);
+ mockApp = (StreamApplication) appDesc -> {
+ appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+ };
+ prepareTest();
+
+ // return the jobConfigs from the planner
+ doReturn(Collections.singletonList(new JobConfig(new
MapConfig(config)))).when(localPlanner).prepareJobs();
+
+ StreamProcessor sp = mock(StreamProcessor.class);
+ ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory>
captor =
+
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+ doAnswer(i ->
+ {
+ ProcessorLifecycleListener listener =
captor.getValue().createInstance(sp);
+ listener.afterStart();
+ listener.afterStop();
+ return null;
+ }).when(sp).start();
+
+ ExternalContext externalContext = mock(ExternalContext.class);
+ doReturn(sp).when(runner)
+ .createStreamProcessor(anyObject(), anyObject(), captor.capture(),
eq(Optional.of(externalContext)), eq(null));
+
doReturn(null).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+ runner.run(externalContext);
+ runner.waitForFinish();
+
+ assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+ }
+
+ @Test
public void testRunFailure() throws Exception {
Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.PROCESSOR_ID, "0");
@@ -437,4 +476,24 @@ public class TestLocalApplicationRunner {
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
}
+ @Test
+ public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
+ MetadataStoreFactory metadataStoreFactory =
+ LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new
MapConfig()));
+ assertNull(metadataStoreFactory);
+ }
+
+ @Test
+ public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
+ MetadataStoreFactory metadataStoreFactory =
+ LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
+ assertNotNull(metadataStoreFactory);
+ }
+
+ @Test
+ public void testGetMetadataStoreFactoryWithDefaultSystem() {
+ MetadataStoreFactory metadataStoreFactory =
+ LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
+ assertNotNull(metadataStoreFactory);
+ }
}