Repository: samza Updated Branches: refs/heads/master 8aa75467e -> 4875842b3
SAMZA-1337: Use StreamTask with the LocalApplicationRunner Author: Boris Shkolnik <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Xinyu Liu <[email protected]>, Bharath Kumarasubramanian <[email protected]> Closes #231 from sborya/LocalAppRunnerWithStreamTask Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4875842b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4875842b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4875842b Branch: refs/heads/master Commit: 4875842b37e0285100e2d1d753a7bc4a1448e897 Parents: 8aa7546 Author: Boris Shkolnik <[email protected]> Authored: Mon Jun 26 12:25:48 2017 -0700 Committer: navina <[email protected]> Committed: Mon Jun 26 12:25:48 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/runtime/ApplicationRunner.java | 11 +++++++ .../samza/runtime/LocalApplicationRunner.java | 27 ++++++++++++++++-- .../samza/runtime/LocalContainerRunner.java | 10 +++++-- .../samza/runtime/RemoteApplicationRunner.java | 13 ++++++--- .../org/apache/samza/zk/ZkJobCoordinator.java | 3 +- .../runtime/TestAbstractApplicationRunner.java | 5 ++++ .../runtime/TestApplicationRunnerMain.java | 5 ++++ .../runtime/TestLocalApplicationRunner.java | 30 ++++++++++++++++++++ 8 files changed, 93 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java index 0586e9e..440dd33 100644 --- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -71,6 +71,17 @@ public abstract class ApplicationRunner { } /** + * Deploy and run the Samza jobs to execute {@link org.apache.samza.task.StreamTask}. + * It is non-blocking so it doesn't wait for the application running. + * This method assumes you task.class is specified in the configs. + * + * NOTE. this interface will most likely change in the future. + */ + @InterfaceStability.Evolving + public abstract void runTask(); + + + /** * Deploy and run the Samza jobs to execute {@link StreamApplication}. * It is non-blocking so it doesn't wait for the application running. * http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index b1f0aba..b0bfc8a 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -32,7 +32,9 @@ import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; @@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory; */ public class LocalApplicationRunner extends AbstractApplicationRunner { - private static final Logger log = LoggerFactory.getLogger(LocalApplicationRunner.class); + private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); // Latch id that's used for awaiting the init of application before creating the StreamProcessors private static final String INIT_LATCH_ID = "init"; // Latch timeout is set to 10 min @@ -134,6 +136,25 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { } @Override + public void runTask() { + JobConfig jobConfig = new JobConfig(this.config); + + // validation + String taskName = new TaskConfig(config).getTaskClass().getOrElse(null); + if (taskName == null) { + throw new SamzaException("Neither APP nor task.class are defined defined"); + } + LOG.info("LocalApplicationRunner will run " + taskName); + LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); + + StreamProcessor processor = createStreamProcessor(jobConfig, null, listener); + + numProcessorsToStart.set(1); + listener.setProcessor(processor); + processor.start(); + } + + @Override public void run(StreamApplication app) { try { // 1. initialize and plan @@ -148,7 +169,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { throw new SamzaException("No jobs to run."); } plan.getJobConfigs().forEach(jobConfig -> { - log.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); + LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); StreamProcessor processor = createStreamProcessor(jobConfig, app, listener); listener.setProcessor(processor); @@ -180,7 +201,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { try { shutdownLatch.await(); } catch (Exception e) { - log.error("Wait is interrupted by exception", e); + LOG.error("Wait is interrupted by exception", e); throw new SamzaException(e); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index d690c80..5d0e455 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -19,6 +19,8 @@ package org.apache.samza.runtime; +import java.util.HashMap; +import java.util.Random; import org.apache.log4j.MDC; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; @@ -41,9 +43,6 @@ import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Random; - /** * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to * have a local runner for yarn before we consolidate the Yarn container and coordination into a @@ -68,6 +67,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner { } @Override + public void runTask() { + throw new UnsupportedOperationException("Running StreamTask is not implemented for LocalContainerRunner"); + } + + @Override public void run(StreamApplication streamApp) { ContainerModel containerModel = jobModel.getContainers().get(containerId); Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 309d8c8..53cd2f6 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -35,12 +35,17 @@ import org.slf4j.LoggerFactory; */ public class RemoteApplicationRunner extends AbstractApplicationRunner { - private static final Logger log = LoggerFactory.getLogger(RemoteApplicationRunner.class); + private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class); public RemoteApplicationRunner(Config config) { super(config); } + @Override + public void runTask() { + throw new UnsupportedOperationException("Running StreamTask is not implemented for RemoteReplicationRunner"); + } + /** * Run the {@link StreamApplication} on the remote cluster * @param app a StreamApplication @@ -57,7 +62,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { // 3. submit jobs for remote execution plan.getJobConfigs().forEach(jobConfig -> { - log.info("Starting job {} with config {}", jobConfig.getName(), jobConfig); + LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig); JobRunner runner = new JobRunner(jobConfig); runner.run(true); }); @@ -72,7 +77,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { ExecutionPlan plan = getExecutionPlan(app); plan.getJobConfigs().forEach(jobConfig -> { - log.info("Killing job {}", jobConfig.getName()); + LOG.info("Killing job {}", jobConfig.getName()); JobRunner runner = new JobRunner(jobConfig); runner.kill(); }); @@ -92,7 +97,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { for (JobConfig jobConfig : plan.getJobConfigs()) { JobRunner runner = new JobRunner(jobConfig); ApplicationStatus status = runner.status(); - log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); + LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); switch (status.getStatusCode()) { case New: http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index a0558ef..cb32252 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -21,6 +21,7 @@ package org.apache.samza.zk; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; @@ -201,7 +202,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { ApplicationConfig appConfig = new ApplicationConfig(config); if (appConfig.getProcessorId() != null) { return appConfig.getProcessorId(); - } else if (appConfig.getAppProcessorIdGeneratorClass() != null) { + } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) { ProcessorIdGenerator idGenerator = ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class); return idGenerator.generateProcessorId(config); http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java index aaacd6e..ed13b5b 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java @@ -368,6 +368,11 @@ public class TestAbstractApplicationRunner { } @Override + public void runTask() { + throw new UnsupportedOperationException("runTask is not supported in this test"); + } + + @Override public void run(StreamApplication streamApp) { // do nothing. We're only testing the stream creation methods at this point. } http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java index 05f3cc2..d22fbae 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java @@ -87,6 +87,11 @@ public class TestApplicationRunnerMain { } @Override + public void runTask() { + throw new UnsupportedOperationException("runTask() not supported in this test"); + } + + @Override public void run(StreamApplication streamApp) { runCount++; } http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 9d15211..a04bd3b 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -23,6 +23,7 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; @@ -205,6 +206,35 @@ public class TestLocalApplicationRunner { } @Test + public void testRunStreamTask() throws Exception { + final Map<String, String> config = new HashMap<>(); + config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + config.put(TaskConfig.TASK_CLASS(), "org.apache.samza.test.processor.IdentityStreamTask"); + + + LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config)); + + StreamProcessor sp = mock(StreamProcessor.class); + ArgumentCaptor<StreamProcessorLifecycleListener> captor = + ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class); + + doAnswer(i -> + { + StreamProcessorLifecycleListener listener = captor.getValue(); + listener.onStart(); + listener.onShutdown(); + return null; + }).when(sp).start(); + + LocalApplicationRunner spy = spy(runner); + doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture()); + + spy.runTask(); + + assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null)); + + } + @Test public void testRunComplete() throws Exception { final Map<String, String> config = new HashMap<>(); config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
