Repository: samza
Updated Branches:
  refs/heads/master 06488bf7a -> a9ff09373


SAMZA-1813: ApplicationRunner should use Planner generated configs for 
StreamManager

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>, Bharath 
Kumarasubramanian <[email protected]>

Closes #612 from prateekm/stream-manager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a9ff0937
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a9ff0937
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a9ff0937

Branch: refs/heads/master
Commit: a9ff09373306c423d3998617978ba5e30b155a8e
Parents: 06488bf
Author: Prateek Maheshwari <[email protected]>
Authored: Tue Aug 21 10:59:18 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Tue Aug 21 10:59:18 2018 -0700

----------------------------------------------------------------------
 .../runtime/AbstractApplicationRunner.java      | 20 +++++---
 .../samza/runtime/LocalApplicationRunner.java   | 41 ++++++++--------
 .../samza/runtime/RemoteApplicationRunner.java  | 36 +++++++-------
 .../runtime/TestLocalApplicationRunner.java     | 50 ++++++++++++--------
 4 files changed, 83 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 7cd19fb..f7ca122 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -58,12 +58,12 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
     this.graphSpec = new StreamGraphSpec(config);
   }
 
-  public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager 
streamManager) throws Exception {
-    return getExecutionPlan(app, null, streamManager);
+  public ExecutionPlan getExecutionPlan(StreamApplication app) throws 
Exception {
+    return getExecutionPlan(app, null);
   }
 
   /* package private */
-  ExecutionPlan getExecutionPlan(StreamApplication app, String runId, 
StreamManager streamManager) throws Exception {
+  ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws 
Exception {
     // build stream graph
     app.init(graphSpec, config);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
@@ -82,8 +82,14 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
     cfg.put(ApplicationConfig.APP_MODE, mode.name());
 
     // create the physical execution plan
-    ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), 
streamManager);
-    return planner.plan(specGraph);
+    Config generatedConfig = new MapConfig(cfg);
+    StreamManager streamManager = buildAndStartStreamManager(generatedConfig);
+    try {
+      ExecutionPlanner planner = new ExecutionPlanner(generatedConfig, 
streamManager);
+      return planner.plan(specGraph);
+    } finally {
+      streamManager.stop();
+    }
   }
 
   /**
@@ -108,8 +114,8 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
   }
 
   @VisibleForTesting
-  StreamManager buildAndStartStreamManager() {
-    StreamManager streamManager = new StreamManager(this.config);
+  StreamManager buildAndStartStreamManager(Config config) {
+    StreamManager streamManager = new StreamManager(config);
     streamManager.start();
     return streamManager;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 0dcb4bf..8a9c151 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -158,32 +158,38 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication app) {
-    StreamManager streamManager = null;
-    try {
-      streamManager = buildAndStartStreamManager();
 
+    try {
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app, streamManager);
+      ExecutionPlan plan = getExecutionPlan(app);
 
       String executionPlanJson = plan.getPlanAsJson();
       writePlanJsonFile(executionPlanJson);
       LOG.info("Execution Plan: \n" + executionPlanJson);
-
-      // 2. create the necessary streams
-      // TODO: System generated intermediate streams should have robust naming 
scheme. See SAMZA-1391
       String planId = String.valueOf(executionPlanJson.hashCode());
-      createStreams(planId, plan.getIntermediateStreams(), streamManager);
 
-      // 3. create the StreamProcessors
       if (plan.getJobConfigs().isEmpty()) {
         throw new SamzaException("No jobs to run.");
       }
+
       plan.getJobConfigs().forEach(jobConfig -> {
-          LOG.debug("Starting job {} StreamProcessor with config {}", 
jobConfig.getName(), jobConfig);
-          LocalStreamProcessorLifeCycleListener listener = new 
LocalStreamProcessorLifeCycleListener();
-          StreamProcessor processor = createStreamProcessor(jobConfig, 
graphSpec, listener);
-          listener.setProcessor(processor);
-          processors.add(processor);
+          StreamManager streamManager = null;
+          try {
+            // 2. create the necessary streams
+            streamManager = buildAndStartStreamManager(jobConfig);
+            createStreams(planId, plan.getIntermediateStreams(), 
streamManager);
+
+            // 3. create the StreamProcessors
+            LOG.debug("Starting job {} StreamProcessor with config {}", 
jobConfig.getName(), jobConfig);
+            LocalStreamProcessorLifeCycleListener listener = new 
LocalStreamProcessorLifeCycleListener();
+            StreamProcessor processor = createStreamProcessor(jobConfig, 
graphSpec, listener);
+            listener.setProcessor(processor);
+            processors.add(processor);
+          } finally {
+            if (streamManager != null) {
+              streamManager.stop();
+            }
+          }
         });
       numProcessorsToStart.set(processors.size());
 
@@ -193,10 +199,6 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
       shutdownLatch.countDown();
       throw new SamzaException(String.format("Failed to start application: 
%s.", app), throwable);
-    } finally {
-      if (streamManager != null) {
-        streamManager.stop();
-      }
     }
   }
 
@@ -256,11 +258,10 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
    * stream creation.
    * @param planId a unique identifier representing the plan used for 
coordination purpose
    * @param intStreams list of intermediate {@link StreamSpec}s
-   * @throws TimeoutException exception for latch timeout
    */
   private void createStreams(String planId,
       List<StreamSpec> intStreams,
-      StreamManager streamManager) throws TimeoutException {
+      StreamManager streamManager) {
     if (intStreams.isEmpty()) {
       LOG.info("Set of intermediate streams is empty. Nothing to create.");
       return;

http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 0ecb35e..6229abc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -61,36 +61,38 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
    */
   @Override
   public void run(StreamApplication app) {
-    StreamManager streamManager = null;
     try {
-      streamManager = buildAndStartStreamManager();
       // TODO: run.id needs to be set for standalone: SAMZA-1531
       // run.id is based on current system time with the most significant bits 
in UUID (8 digits) to avoid collision
       String runId = String.valueOf(System.currentTimeMillis()) + "-" + 
UUID.randomUUID().toString().substring(0, 8);
       LOG.info("The run id for this run is {}", runId);
 
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app, runId, streamManager);
+      ExecutionPlan plan = getExecutionPlan(app, runId);
       writePlanJsonFile(plan.getPlanAsJson());
 
-      // 2. create the necessary streams
-      if (plan.getApplicationConfig().getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH) {
-        streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
-      }
-      streamManager.createStreams(plan.getIntermediateStreams());
-
-      // 3. submit jobs for remote execution
       plan.getJobConfigs().forEach(jobConfig -> {
-          LOG.info("Starting job {} with config {}", jobConfig.getName(), 
jobConfig);
-          JobRunner runner = new JobRunner(jobConfig);
-          runner.run(true);
+          StreamManager streamManager = null;
+          try {
+            // 2. create the necessary streams
+            streamManager = buildAndStartStreamManager(jobConfig);
+            if (plan.getApplicationConfig().getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH) {
+              
streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
+            }
+            streamManager.createStreams(plan.getIntermediateStreams());
+
+            // 3. submit jobs for remote execution
+            LOG.info("Starting job {} with config {}", jobConfig.getName(), 
jobConfig);
+            JobRunner runner = new JobRunner(jobConfig);
+            runner.run(true);
+          } finally {
+            if (streamManager != null) {
+              streamManager.stop();
+            }
+          }
         });
     } catch (Throwable t) {
       throw new SamzaException("Failed to run application", t);
-    } finally {
-      if (streamManager != null) {
-        streamManager.stop();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a9ff0937/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 5eb139b..0335913 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -20,6 +20,7 @@
 package org.apache.samza.runtime;
 
 import com.google.common.collect.ImmutableList;
+
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,7 +57,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
 
 
@@ -74,17 +81,18 @@ public class TestLocalApplicationRunner {
   @Test
   public void testStreamCreation()
       throws Exception {
-    Map<String, String> config = new HashMap<>();
-    LocalApplicationRunner runner = spy(new LocalApplicationRunner(new 
MapConfig(config)));
+    Config config = new MapConfig(new HashMap<>());
+    LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
     StreamApplication app = mock(StreamApplication.class);
 
     StreamManager streamManager = mock(StreamManager.class);
-    doReturn(streamManager).when(runner).buildAndStartStreamManager();
+    
doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class));
 
     ExecutionPlan plan = mock(ExecutionPlan.class);
     
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
     when(plan.getPlanAsJson()).thenReturn("");
-    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(config)));
+    doReturn(plan).when(runner).getExecutionPlan(any());
 
     CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
     JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
@@ -109,19 +117,19 @@ public class TestLocalApplicationRunner {
   @Test
   public void testStreamCreationWithCoordination()
       throws Exception {
-    Map<String, String> config = new HashMap<>();
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(new 
MapConfig(config));
-    LocalApplicationRunner runner = spy(localRunner);
+    Config config = new MapConfig(new HashMap<>());
+    LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
 
     StreamApplication app = mock(StreamApplication.class);
 
     StreamManager streamManager = mock(StreamManager.class);
-    doReturn(streamManager).when(runner).buildAndStartStreamManager();
+    
doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class));
 
     ExecutionPlan plan = mock(ExecutionPlan.class);
     
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
     when(plan.getPlanAsJson()).thenReturn("");
-    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(config)));
+    doReturn(plan).when(runner).getExecutionPlan(any());
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
     CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
@@ -183,19 +191,20 @@ public class TestLocalApplicationRunner {
   @Test
   public void testRunComplete()
       throws Exception {
-    final Map<String, String> config = new HashMap<>();
-    config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
+    Config config = new MapConfig(configMap);
     LocalApplicationRunner runner = spy(new LocalApplicationRunner(new 
MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
 
     // buildAndStartStreamManager already includes start, so not going to 
verify it gets called
     StreamManager streamManager = mock(StreamManager.class);
-    when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
+    
when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager);
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
     when(plan.getPlanAsJson()).thenReturn("");
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
+    doReturn(plan).when(runner).getExecutionPlan(any());
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -221,19 +230,20 @@ public class TestLocalApplicationRunner {
   @Test
   public void testRunFailure()
       throws Exception {
-    final Map<String, String> config = new HashMap<>();
-    config.put(ApplicationConfig.PROCESSOR_ID, "0");
-    LocalApplicationRunner runner = spy(new LocalApplicationRunner(new 
MapConfig(config)));
+    final Map<String, String> configMap = new HashMap<>();
+    configMap.put(ApplicationConfig.PROCESSOR_ID, "0");
+    MapConfig config = new MapConfig(configMap);
+    LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
     StreamApplication app = mock(StreamApplication.class);
 
     // buildAndStartStreamManager already includes start, so not going to 
verify it gets called
     StreamManager streamManager = mock(StreamManager.class);
-    when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
+    
when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager);
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
     when(plan.getPlanAsJson()).thenReturn("");
-    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(config)));
+    doReturn(plan).when(runner).getExecutionPlan(any());
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =

Reply via email to