Repository: samza Updated Branches: refs/heads/master 06488bf7a -> a9ff09373
SAMZA-1813: ApplicationRunner should use Planner generated configs for StreamManager Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]>, Bharath Kumarasubramanian <[email protected]> Closes #612 from prateekm/stream-manager Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a9ff0937 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a9ff0937 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a9ff0937 Branch: refs/heads/master Commit: a9ff09373306c423d3998617978ba5e30b155a8e Parents: 06488bf Author: Prateek Maheshwari <[email protected]> Authored: Tue Aug 21 10:59:18 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Tue Aug 21 10:59:18 2018 -0700 ---------------------------------------------------------------------- .../runtime/AbstractApplicationRunner.java | 20 +++++--- .../samza/runtime/LocalApplicationRunner.java | 41 ++++++++-------- .../samza/runtime/RemoteApplicationRunner.java | 36 +++++++------- .../runtime/TestLocalApplicationRunner.java | 50 ++++++++++++-------- 4 files changed, 83 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index 7cd19fb..f7ca122 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -58,12 +58,12 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { this.graphSpec = new StreamGraphSpec(config); } - public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager streamManager) throws Exception { - return getExecutionPlan(app, null, streamManager); + public ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception { + return getExecutionPlan(app, null); } /* package private */ - ExecutionPlan getExecutionPlan(StreamApplication app, String runId, StreamManager streamManager) throws Exception { + ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception { // build stream graph app.init(graphSpec, config); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); @@ -82,8 +82,14 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { cfg.put(ApplicationConfig.APP_MODE, mode.name()); // create the physical execution plan - ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager); - return planner.plan(specGraph); + Config generatedConfig = new MapConfig(cfg); + StreamManager streamManager = buildAndStartStreamManager(generatedConfig); + try { + ExecutionPlanner planner = new ExecutionPlanner(generatedConfig, streamManager); + return planner.plan(specGraph); + } finally { + streamManager.stop(); + } } /** @@ -108,8 +114,8 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { } @VisibleForTesting - StreamManager buildAndStartStreamManager() { - StreamManager streamManager = new StreamManager(this.config); + StreamManager buildAndStartStreamManager(Config config) { + StreamManager streamManager = new StreamManager(config); streamManager.start(); return streamManager; } http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 0dcb4bf..8a9c151 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -158,32 +158,38 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication app) { - StreamManager streamManager = null; - try { - streamManager = buildAndStartStreamManager(); + try { // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(app, streamManager); + ExecutionPlan plan = getExecutionPlan(app); String executionPlanJson = plan.getPlanAsJson(); writePlanJsonFile(executionPlanJson); LOG.info("Execution Plan: \n" + executionPlanJson); - - // 2. create the necessary streams - // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391 String planId = String.valueOf(executionPlanJson.hashCode()); - createStreams(planId, plan.getIntermediateStreams(), streamManager); - // 3. create the StreamProcessors if (plan.getJobConfigs().isEmpty()) { throw new SamzaException("No jobs to run."); } + plan.getJobConfigs().forEach(jobConfig -> { - LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); - LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); - StreamProcessor processor = createStreamProcessor(jobConfig, graphSpec, listener); - listener.setProcessor(processor); - processors.add(processor); + StreamManager streamManager = null; + try { + // 2. create the necessary streams + streamManager = buildAndStartStreamManager(jobConfig); + createStreams(planId, plan.getIntermediateStreams(), streamManager); + + // 3. create the StreamProcessors + LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); + LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); + StreamProcessor processor = createStreamProcessor(jobConfig, graphSpec, listener); + listener.setProcessor(processor); + processors.add(processor); + } finally { + if (streamManager != null) { + streamManager.stop(); + } + } }); numProcessorsToStart.set(processors.size()); @@ -193,10 +199,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { appStatus = ApplicationStatus.unsuccessfulFinish(throwable); shutdownLatch.countDown(); throw new SamzaException(String.format("Failed to start application: %s.", app), throwable); - } finally { - if (streamManager != null) { - streamManager.stop(); - } } } @@ -256,11 +258,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { * stream creation. * @param planId a unique identifier representing the plan used for coordination purpose * @param intStreams list of intermediate {@link StreamSpec}s - * @throws TimeoutException exception for latch timeout */ private void createStreams(String planId, List<StreamSpec> intStreams, - StreamManager streamManager) throws TimeoutException { + StreamManager streamManager) { if (intStreams.isEmpty()) { LOG.info("Set of intermediate streams is empty. Nothing to create."); return; http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 0ecb35e..6229abc 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -61,36 +61,38 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { */ @Override public void run(StreamApplication app) { - StreamManager streamManager = null; try { - streamManager = buildAndStartStreamManager(); // TODO: run.id needs to be set for standalone: SAMZA-1531 // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); LOG.info("The run id for this run is {}", runId); // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(app, runId, streamManager); + ExecutionPlan plan = getExecutionPlan(app, runId); writePlanJsonFile(plan.getPlanAsJson()); - // 2. create the necessary streams - if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) { - streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun()); - } - streamManager.createStreams(plan.getIntermediateStreams()); - - // 3. submit jobs for remote execution plan.getJobConfigs().forEach(jobConfig -> { - LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig); - JobRunner runner = new JobRunner(jobConfig); - runner.run(true); + StreamManager streamManager = null; + try { + // 2. create the necessary streams + streamManager = buildAndStartStreamManager(jobConfig); + if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) { + streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun()); + } + streamManager.createStreams(plan.getIntermediateStreams()); + + // 3. submit jobs for remote execution + LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig); + JobRunner runner = new JobRunner(jobConfig); + runner.run(true); + } finally { + if (streamManager != null) { + streamManager.stop(); + } + } }); } catch (Throwable t) { throw new SamzaException("Failed to run application", t); - } finally { - if (streamManager != null) { - streamManager.stop(); - } } } http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 5eb139b..0335913 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -20,6 +20,7 @@ package org.apache.samza.runtime; import com.google.common.collect.ImmutableList; + import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -56,7 +57,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.doReturn; @@ -74,17 +81,18 @@ public class TestLocalApplicationRunner { @Test public void testStreamCreation() throws Exception { - Map<String, String> config = new HashMap<>(); - LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config))); + Config config = new MapConfig(new HashMap<>()); + LocalApplicationRunner runner = spy(new LocalApplicationRunner(config)); StreamApplication app = mock(StreamApplication.class); StreamManager streamManager = mock(StreamManager.class); - doReturn(streamManager).when(runner).buildAndStartStreamManager(); + doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class)); ExecutionPlan plan = mock(ExecutionPlan.class); when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"))); when(plan.getPlanAsJson()).thenReturn(""); - doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager)); + when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config))); + doReturn(plan).when(runner).getExecutionPlan(any()); CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class); JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class); @@ -109,19 +117,19 @@ public class TestLocalApplicationRunner { @Test public void testStreamCreationWithCoordination() throws Exception { - Map<String, String> config = new HashMap<>(); - LocalApplicationRunner localRunner = new LocalApplicationRunner(new MapConfig(config)); - LocalApplicationRunner runner = spy(localRunner); + Config config = new MapConfig(new HashMap<>()); + LocalApplicationRunner runner = spy(new LocalApplicationRunner(config)); StreamApplication app = mock(StreamApplication.class); StreamManager streamManager = mock(StreamManager.class); - doReturn(streamManager).when(runner).buildAndStartStreamManager(); + doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class)); ExecutionPlan plan = mock(ExecutionPlan.class); when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"))); when(plan.getPlanAsJson()).thenReturn(""); - doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager)); + when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config))); + doReturn(plan).when(runner).getExecutionPlan(any()); CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class); @@ -183,19 +191,20 @@ public class TestLocalApplicationRunner { @Test public void testRunComplete() throws Exception { - final Map<String, String> config = new HashMap<>(); - config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + HashMap<String, String> configMap = new HashMap<>(); + configMap.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + Config config = new MapConfig(configMap); LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config))); StreamApplication app = mock(StreamApplication.class); // buildAndStartStreamManager already includes start, so not going to verify it gets called StreamManager streamManager = mock(StreamManager.class); - when(runner.buildAndStartStreamManager()).thenReturn(streamManager); + when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager); ExecutionPlan plan = mock(ExecutionPlan.class); when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList()); when(plan.getPlanAsJson()).thenReturn(""); when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))); - doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager)); + doReturn(plan).when(runner).getExecutionPlan(any()); StreamProcessor sp = mock(StreamProcessor.class); ArgumentCaptor<StreamProcessorLifecycleListener> captor = @@ -221,19 +230,20 @@ public class TestLocalApplicationRunner { @Test public void testRunFailure() throws Exception { - final Map<String, String> config = new HashMap<>(); - config.put(ApplicationConfig.PROCESSOR_ID, "0"); - LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config))); + final Map<String, String> configMap = new HashMap<>(); + configMap.put(ApplicationConfig.PROCESSOR_ID, "0"); + MapConfig config = new MapConfig(configMap); + LocalApplicationRunner runner = spy(new LocalApplicationRunner(config)); StreamApplication app = mock(StreamApplication.class); // buildAndStartStreamManager already includes start, so not going to verify it gets called StreamManager streamManager = mock(StreamManager.class); - when(runner.buildAndStartStreamManager()).thenReturn(streamManager); + when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager); ExecutionPlan plan = mock(ExecutionPlan.class); when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList()); when(plan.getPlanAsJson()).thenReturn(""); - when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))); - doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager)); + when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config))); + doReturn(plan).when(runner).getExecutionPlan(any()); StreamProcessor sp = mock(StreamProcessor.class); ArgumentCaptor<StreamProcessorLifecycleListener> captor =
