This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e0b5a32b1 SAMZA-2298: Fix CoordinatorStreamStore creation for
LocalApplicationRunner (#1136)
e0b5a32b1 is described below
commit e0b5a32b1ca542f0409142c667bf0bcec68d5cb3
Author: Daniel Nishimura <[email protected]>
AuthorDate: Fri Aug 30 10:26:06 2019 -0700
SAMZA-2298: Fix CoordinatorStreamStore creation for LocalApplicationRunner
(#1136)
---
.../samza/runtime/LocalApplicationRunner.java | 66 +++++++++++++---
.../samza/runtime/TestLocalApplicationRunner.java | 90 ++++++++++++++++++++--
2 files changed, 140 insertions(+), 16 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 9337d9b..cdf1a33 100644
---
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -57,10 +57,15 @@ import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,7 +101,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
* @param config configuration for the application
*/
public LocalApplicationRunner(SamzaApplication app, Config config) {
- this(app, config, getMetadataStoreFactory(new JobConfig(config)));
+ this(app, config, getDefaultCoordinatorStreamStoreFactory(new
JobConfig(config)));
}
/**
@@ -121,15 +126,22 @@ public class LocalApplicationRunner implements
ApplicationRunner {
this.appDesc = appDesc;
this.isAppModeBatch = new
ApplicationConfig(appDesc.getConfig()).getAppMode() ==
ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = coordinationUtils;
- this.metadataStoreFactory =
Optional.ofNullable(getMetadataStoreFactory(new
JobConfig(appDesc.getConfig())));
+ this.metadataStoreFactory =
Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new
JobConfig(appDesc.getConfig())));
}
- static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
- if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
+ static MetadataStoreFactory
getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
+ String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
+ JobCoordinatorConfig jobCoordinatorConfig = new
JobCoordinatorConfig(jobConfig);
+ String jobCoordinatorFactoryClassName =
jobCoordinatorConfig.getJobCoordinatorFactoryClassName();
+
+ // TODO: Remove restriction to only ZkJobCoordinator after next phase of
metadata store abstraction.
+ if (StringUtils.isNotBlank(coordinatorSystemName) &&
ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName))
{
return new CoordinatorStreamMetadataStoreFactory();
}
- LOG.warn("{} or {} not configured. No coordinator stream metadata store
will be created.",
- JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+
+ LOG.warn("{} or {} not configured, or {} is not {}. No default coordinator
stream metadata store will be created.",
+ JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM,
+ JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
ZkJobCoordinatorFactory.class.getName());
return null;
}
@@ -272,16 +284,50 @@ public class LocalApplicationRunner implements
ApplicationRunner {
}
@VisibleForTesting
- MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+ MetadataStore createCoordinatorStreamStore(Config config) {
if (metadataStoreFactory.isPresent()) {
- MetadataStore coordinatorStreamStore =
- metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new
MetricsRegistryMap());
- return coordinatorStreamStore;
+ // TODO: Add missing metadata store abstraction for creating the
underlying store to address SAMZA-2182
+ if (metadataStoreFactory.get() instanceof
CoordinatorStreamMetadataStoreFactory) {
+ if (createUnderlyingCoordinatorStream(config)) {
+ MetadataStore coordinatorStreamStore =
+ metadataStoreFactory.get().getMetadataStore("NoOp", config, new
MetricsRegistryMap());
+ LOG.info("Created coordinator stream store of type: {}",
coordinatorStreamStore.getClass().getSimpleName());
+ return coordinatorStreamStore;
+ }
+ } else {
+ MetadataStore otherMetadataStore =
+ metadataStoreFactory.get().getMetadataStore("NoOp", config, new
MetricsRegistryMap());
+ LOG.info("Created alternative coordinator stream store of type: {}",
otherMetadataStore.getClass().getSimpleName());
+ return otherMetadataStore;
+ }
}
+
+ LOG.warn("No coordinator stream store created.");
return null;
}
@VisibleForTesting
+ boolean createUnderlyingCoordinatorStream(Config config) {
+ // TODO: This work around method is necessary due to SAMZA-2182 - Metadata
store: disconnect between creation and usage of the underlying storage
+ // and will be addressed in the next phase of metadata store abstraction
+ if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
+ LOG.warn("{} or {} not configured. Coordinator stream not created.",
+ JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+ return false;
+ }
+ SystemStream coordinatorSystemStream =
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+ SystemAdmins systemAdmins = new SystemAdmins(config);
+ systemAdmins.start();
+ try {
+ SystemAdmin coordinatorSystemAdmin =
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
+ CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream,
coordinatorSystemAdmin);
+ } finally {
+ systemAdmins.stop();
+ }
+ return true;
+ }
+
+ @VisibleForTesting
StreamProcessor createStreamProcessor(Config config,
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,
Optional<ExternalContext> externalContextOptional, MetadataStore
coordinatorStreamStore) {
diff --git
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 4acc8c3..71100c4 100644
---
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -35,17 +35,25 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.coordinator.ClusterMembership;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.DistributedLock;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.InMemoryMetadataStore;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.SystemAdmins;
import org.apache.samza.task.IdentityStreamTask;
import org.apache.samza.zk.ZkMetadataStore;
import org.apache.samza.zk.ZkMetadataStoreFactory;
@@ -476,24 +484,94 @@ public class TestLocalApplicationRunner {
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
}
+ /**
+ * Default metadata store factory should be null if no job coordinator
system defined and the default
+ * ZkJobCoordinator is used.
+ */
@Test
- public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
+ public void
testGetCoordinatorStreamStoreFactoryWithoutJobCoordinatorSystem() {
MetadataStoreFactory metadataStoreFactory =
- LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new
MapConfig()));
+ LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new
JobConfig(new MapConfig()));
assertNull(metadataStoreFactory);
}
+ /**
+ * Default metadata store factory should not be null if job coordinator
system defined and the default
+ * ZkJobCoordinator is used.
+ */
@Test
- public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
+ public void testGetCoordinatorStreamStoreFactoryWithJobCoordinatorSystem() {
MetadataStoreFactory metadataStoreFactory =
- LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
+ LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new
JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM,
"test-system"))));
assertNotNull(metadataStoreFactory);
}
+ /**
+ * Default metadata store factory should not be null if default system
defined and the default
+ * ZkJobCoordinator is used.
+ */
@Test
- public void testGetMetadataStoreFactoryWithDefaultSystem() {
+ public void testGetCoordinatorStreamStoreFactoryWithDefaultSystem() {
MetadataStoreFactory metadataStoreFactory =
- LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
+ LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new
JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM,
"test-system"))));
assertNotNull(metadataStoreFactory);
}
+
+ /**
+ * Default metadata store factory be null if job coordinator system or
default system defined and a non ZkJobCoordinator
+ * job coordinator is used.
+ */
+ @Test
+ public void testGetCoordinatorStreamStoreFactoryWithNonZkJobCoordinator() {
+ MapConfig mapConfig = new MapConfig(
+ ImmutableMap.of(
+ JobConfig.JOB_DEFAULT_SYSTEM, "test-system",
+ JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName()));
+ MetadataStoreFactory metadataStoreFactory =
+ LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new
JobConfig(mapConfig));
+ assertNull(metadataStoreFactory);
+ }
+
+ /**
+ * Underlying coordinator stream should be created if using
CoordinatorStreamMetadataStoreFactory
+ * @throws Exception
+ */
+ @Test
+ public void testCreateCoordinatorStreamWithCoordinatorFactory() throws
Exception {
+ CoordinatorStreamStore coordinatorStreamStore =
mock(CoordinatorStreamStore.class);
+ CoordinatorStreamMetadataStoreFactory
coordinatorStreamMetadataStoreFactory =
mock(CoordinatorStreamMetadataStoreFactory.class);
+
doReturn(coordinatorStreamStore).when(coordinatorStreamMetadataStoreFactory).getMetadataStore(anyString(),
any(Config.class), any(
+ MetricsRegistry.class));
+ SystemAdmins systemAdmins = mock(SystemAdmins.class);
+
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
+ LocalApplicationRunner localApplicationRunner =
+ spy(new LocalApplicationRunner(mockApp, config,
coordinatorStreamMetadataStoreFactory));
+
+ // create store only if successful in creating the underlying coordinator
stream
+
doReturn(true).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+ assertEquals(coordinatorStreamStore,
localApplicationRunner.createCoordinatorStreamStore(config));
+
verify(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+
+ // do not create store if creating the underlying coordinator stream fails
+
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+ assertNull(localApplicationRunner.createCoordinatorStreamStore(config));
+ }
+
+ /**
+ * Underlying coordinator stream should not be created if not using
CoordinatorStreamMetadataStoreFactory
+ * @throws Exception
+ */
+ @Test
+ public void testCreateCoordinatorStreamWithoutCoordinatorFactory() throws
Exception {
+ SystemAdmins systemAdmins = mock(SystemAdmins.class);
+
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
+ LocalApplicationRunner localApplicationRunner =
+ spy(new LocalApplicationRunner(mockApp, config, new
InMemoryMetadataStoreFactory()));
+
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
+ MetadataStore coordinatorStreamStore =
localApplicationRunner.createCoordinatorStreamStore(config);
+ assertTrue(coordinatorStreamStore instanceof InMemoryMetadataStore);
+
+ // creating underlying coordinator stream should not be called for other
coordinator stream metadata store types.
+ verify(localApplicationRunner,
never()).createUnderlyingCoordinatorStream(eq(config));
+ }
}