Repository: samza
Updated Branches:
  refs/heads/master a4ac1791f -> 7fc39e521


SAMZA-1709: Use lazy creation for SystemAdmins in ApplicationRunners

An instance of SystemAdmins is created when instantiating any 
AbstractApplicationRunner, but the SystemAdmins is only actually needed for 
some of the methods for some of the runners. For example, 
LocalApplicationRunner.kill does not need SystemAdmins, and 
LocalContainerRunner does not need SystemAdmins for anything.
Doing lazy instantiation allows us to more easily manage the SystemAdmins 
lifecycle, since it removes the need to add lifecycle hooks for the 
ApplicationRunner.
This also fixes the lifecycle management for SystemAdmins in ApplicationRunners.

Author: Cameron Lee <ca...@linkedin.com>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #512 from cameronlee314/runnner_system_admins


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7fc39e52
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7fc39e52
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7fc39e52

Branch: refs/heads/master
Commit: 7fc39e521ace845ac7a3af79b84e9710f09581cd
Parents: a4ac179
Author: Cameron Lee <ca...@linkedin.com>
Authored: Wed Jun 13 16:12:44 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Jun 13 16:12:44 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/execution/StreamManager.java   | 16 ++++-
 .../runtime/AbstractApplicationRunner.java      | 47 +++++--------
 .../samza/runtime/LocalApplicationRunner.java   | 22 ++++--
 .../samza/runtime/RemoteApplicationRunner.java  | 34 ++++++---
 .../runtime/TestApplicationRunnerMain.java      |  2 -
 .../runtime/TestLocalApplicationRunner.java     | 22 ++++--
 .../sql/runner/SamzaSqlApplicationRunner.java   |  2 -
 ...StreamApplicationIntegrationTestHarness.java | 74 +++++++++++++-------
 .../operator/TestRepartitionJoinWindowApp.java  |  4 +-
 9 files changed, 138 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index b0473c1..7f60f96 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.execution;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Collection;
@@ -49,7 +50,12 @@ public class StreamManager {
 
   private final SystemAdmins systemAdmins;
 
-  public StreamManager(SystemAdmins systemAdmins) {
+  public StreamManager(Config config) {
+    this(new SystemAdmins(config));
+  }
+
+  @VisibleForTesting
+  StreamManager(SystemAdmins systemAdmins) {
     this.systemAdmins = systemAdmins;
   }
 
@@ -70,6 +76,14 @@ public class StreamManager {
     }
   }
 
+  public void start() {
+    this.systemAdmins.start();
+  }
+
+  public void stop() {
+    this.systemAdmins.stop();
+  }
+
   Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> 
streamNames) {
     Map<String, Integer> streamToPartitionCount = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 5043977..3716d2b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -18,6 +18,13 @@
  */
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -32,17 +39,9 @@ import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemAdmins;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 
 /**
  * Defines common, core behavior for implementations of the {@link 
ApplicationRunner} API.
@@ -50,9 +49,6 @@ import java.util.Set;
 public abstract class AbstractApplicationRunner extends ApplicationRunner {
   private static final Logger log = 
LoggerFactory.getLogger(AbstractApplicationRunner.class);
 
-  private final StreamManager streamManager;
-  private final SystemAdmins systemAdmins;
-
   /**
    * The {@link ApplicationRunner} is supposed to run a single {@link 
StreamApplication} instance in the full life-cycle
    */
@@ -61,8 +57,6 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
   public AbstractApplicationRunner(Config config) {
     super(config);
     this.graphSpec = new StreamGraphSpec(this, config);
-    this.systemAdmins = new SystemAdmins(config);
-    this.streamManager = new StreamManager(systemAdmins);
   }
 
   @Override
@@ -72,16 +66,6 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
     return getStreamSpec(streamId, physicalName);
   }
 
-  @Override
-  public void run(StreamApplication streamApp) {
-    systemAdmins.start();
-  }
-
-  @Override
-  public void kill(StreamApplication streamApp) {
-    systemAdmins.stop();
-  }
-
   /**
    * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
    *
@@ -126,12 +110,12 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
     return new StreamSpec(streamId, physicalName, system, isBounded, 
properties);
   }
 
-  public ExecutionPlan getExecutionPlan(StreamApplication app) throws 
Exception {
-    return getExecutionPlan(app, null);
+  public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager 
streamManager) throws Exception {
+    return getExecutionPlan(app, null, streamManager);
   }
 
   /* package private */
-  ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws 
Exception {
+  ExecutionPlan getExecutionPlan(StreamApplication app, String runId, 
StreamManager streamManager) throws Exception {
     // build stream graph
     app.init(graphSpec, config);
 
@@ -152,11 +136,6 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
     return planner.plan(specGraph);
   }
 
-  /* package private for testing */
-  StreamManager getStreamManager() {
-    return streamManager;
-  }
-
   /**
    * Write the execution plan JSON to a file
    * @param planJson JSON representation of the plan
@@ -178,4 +157,10 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
     }
   }
 
+  @VisibleForTesting
+  StreamManager buildAndStartStreamManager() {
+    StreamManager streamManager = new StreamManager(this.config);
+    streamManager.start();
+    return streamManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index d3df741..0dcb4bf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -42,6 +42,7 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
+import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.operators.StreamGraphSpec;
@@ -157,10 +158,12 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
-      super.run(app);
+      streamManager = buildAndStartStreamManager();
+
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app);
+      ExecutionPlan plan = getExecutionPlan(app, streamManager);
 
       String executionPlanJson = plan.getPlanAsJson();
       writePlanJsonFile(executionPlanJson);
@@ -169,7 +172,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       // 2. create the necessary streams
       // TODO: System generated intermediate streams should have robust naming 
scheme. See SAMZA-1391
       String planId = String.valueOf(executionPlanJson.hashCode());
-      createStreams(planId, plan.getIntermediateStreams());
+      createStreams(planId, plan.getIntermediateStreams(), streamManager);
 
       // 3. create the StreamProcessors
       if (plan.getJobConfigs().isEmpty()) {
@@ -190,13 +193,16 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
       shutdownLatch.countDown();
       throw new SamzaException(String.format("Failed to start application: 
%s.", app), throwable);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
   @Override
   public void kill(StreamApplication streamApp) {
     processors.forEach(StreamProcessor::stop);
-    super.kill(streamApp);
   }
 
   @Override
@@ -252,7 +258,9 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
    * @param intStreams list of intermediate {@link StreamSpec}s
    * @throws TimeoutException exception for latch timeout
    */
-  /* package private */ void createStreams(String planId, List<StreamSpec> 
intStreams) throws TimeoutException {
+  private void createStreams(String planId,
+      List<StreamSpec> intStreams,
+      StreamManager streamManager) throws TimeoutException {
     if (intStreams.isEmpty()) {
       LOG.info("Set of intermediate streams is empty. Nothing to create.");
       return;
@@ -268,7 +276,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       LOG.warn("Processor {} failed to create utils. Each processor will 
attempt to create streams.", uid);
       // each application process will try creating the streams, which
       // requires stream creation to be idempotent
-      getStreamManager().createStreams(intStreams);
+      streamManager.createStreams(intStreams);
       return;
     }
 
@@ -277,7 +285,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       // check if the processor needs to go through leader election and stream 
creation
       if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
         LOG.info("lock acquired for streams creation by " + uid);
-        getStreamManager().createStreams(intStreams);
+        streamManager.createStreams(intStreams);
         lockWithState.unlockAndSet();
       } else {
         LOG.info("Processor {} did not obtain the lock for streams creation. 
They must've been created by another processor.", uid);

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 202fa76..99fdc51 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -20,6 +20,7 @@
 package org.apache.samza.runtime;
 
 import java.time.Duration;
+import java.util.UUID;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -27,14 +28,13 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.execution.ExecutionPlan;
+import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.UUID;
-
 import static org.apache.samza.job.ApplicationStatus.*;
 
 
@@ -61,22 +61,23 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
    */
   @Override
   public void run(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
-      super.run(app);
+      streamManager = buildAndStartStreamManager();
       // TODO: run.id needs to be set for standalone: SAMZA-1531
       // run.id is based on current system time with the most significant bits 
in UUID (8 digits) to avoid collision
       String runId = String.valueOf(System.currentTimeMillis()) + "-" + 
UUID.randomUUID().toString().substring(0, 8);
       LOG.info("The run id for this run is {}", runId);
 
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app, runId);
+      ExecutionPlan plan = getExecutionPlan(app, runId, streamManager);
       writePlanJsonFile(plan.getPlanAsJson());
 
       // 2. create the necessary streams
       if (plan.getApplicationConfig().getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH) {
-        getStreamManager().clearStreamsFromPreviousRun(getConfigFromPrevRun());
+        streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
       }
-      getStreamManager().createStreams(plan.getIntermediateStreams());
+      streamManager.createStreams(plan.getIntermediateStreams());
 
       // 3. submit jobs for remote execution
       plan.getJobConfigs().forEach(jobConfig -> {
@@ -86,33 +87,44 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
         });
     } catch (Throwable t) {
       throw new SamzaException("Failed to run application", t);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
   @Override
   public void kill(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
-      ExecutionPlan plan = getExecutionPlan(app);
+      streamManager = buildAndStartStreamManager();
+      ExecutionPlan plan = getExecutionPlan(app, streamManager);
 
       plan.getJobConfigs().forEach(jobConfig -> {
           LOG.info("Killing job {}", jobConfig.getName());
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
-      super.kill(app);
     } catch (Throwable t) {
       throw new SamzaException("Failed to kill application", t);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
   @Override
   public ApplicationStatus status(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
       boolean hasNewJobs = false;
       boolean hasRunningJobs = false;
       ApplicationStatus unsuccessfulFinishStatus = null;
 
-      ExecutionPlan plan = getExecutionPlan(app);
+      streamManager = buildAndStartStreamManager();
+      ExecutionPlan plan = getExecutionPlan(app, streamManager);
       for (JobConfig jobConfig : plan.getJobConfigs()) {
         ApplicationStatus status = getApplicationStatus(jobConfig);
 
@@ -148,6 +160,10 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
       }
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 870d586..eb0ebe9 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -93,13 +93,11 @@ public class TestApplicationRunnerMain {
 
     @Override
     public void run(StreamApplication streamApp) {
-      super.run(streamApp);
       runCount++;
     }
 
     @Override
     public void kill(StreamApplication streamApp) {
-      super.kill(streamApp);
       killCount++;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 595dda2..5eb139b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -79,12 +79,12 @@ public class TestLocalApplicationRunner {
     StreamApplication app = mock(StreamApplication.class);
 
     StreamManager streamManager = mock(StreamManager.class);
-    doReturn(streamManager).when(runner).getStreamManager();
+    doReturn(streamManager).when(runner).buildAndStartStreamManager();
 
     ExecutionPlan plan = mock(ExecutionPlan.class);
     
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
     when(plan.getPlanAsJson()).thenReturn("");
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
     JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
@@ -103,6 +103,7 @@ public class TestLocalApplicationRunner {
     List<StreamSpec> streamSpecs = captor.getValue();
     assertEquals(streamSpecs.size(), 1);
     assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
   }
 
   @Test
@@ -115,12 +116,12 @@ public class TestLocalApplicationRunner {
     StreamApplication app = mock(StreamApplication.class);
 
     StreamManager streamManager = mock(StreamManager.class);
-    doReturn(streamManager).when(runner).getStreamManager();
+    doReturn(streamManager).when(runner).buildAndStartStreamManager();
 
     ExecutionPlan plan = mock(ExecutionPlan.class);
     
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
     when(plan.getPlanAsJson()).thenReturn("");
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
     CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
@@ -147,6 +148,7 @@ public class TestLocalApplicationRunner {
     List<StreamSpec> streamSpecs = captor.getValue();
     assertEquals(streamSpecs.size(), 1);
     assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
   }
 
   @Test
@@ -186,11 +188,14 @@ public class TestLocalApplicationRunner {
     LocalApplicationRunner runner = spy(new LocalApplicationRunner(new 
MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
 
+    // buildAndStartStreamManager already includes start, so not going to 
verify it gets called
+    StreamManager streamManager = mock(StreamManager.class);
+    when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
     when(plan.getPlanAsJson()).thenReturn("");
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -210,6 +215,7 @@ public class TestLocalApplicationRunner {
     runner.waitForFinish();
 
     assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish);
+    verify(streamManager).stop();
   }
 
   @Test
@@ -220,11 +226,14 @@ public class TestLocalApplicationRunner {
     LocalApplicationRunner runner = spy(new LocalApplicationRunner(new 
MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
 
+    // buildAndStartStreamManager already includes start, so not going to 
verify it gets called
+    StreamManager streamManager = mock(StreamManager.class);
+    when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
     when(plan.getPlanAsJson()).thenReturn("");
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -245,6 +254,7 @@ public class TestLocalApplicationRunner {
     }
 
     assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish);
+    verify(streamManager).stop();
   }
 
   public static Set<StreamProcessor> getProcessors(LocalApplicationRunner 
runner) {

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index f3093a7..044c7cf 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -120,7 +120,6 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
-    super.run(streamApp);
     Validate.isInstanceOf(SamzaSqlApplication.class, streamApp);
     appRunner.run(streamApp);
   }
@@ -128,7 +127,6 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication streamApp) {
     appRunner.kill(streamApp);
-    super.kill(streamApp);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index 1595347..3c11533 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.execution.TestStreamManager;
@@ -101,8 +102,6 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
   private KafkaProducer producer;
   private KafkaConsumer consumer;
   protected KafkaSystemAdmin systemAdmin;
-  private StreamApplication app;
-  protected AbstractApplicationRunner runner;
 
   private int numEmptyPolls = 3;
   private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
@@ -218,25 +217,29 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
    * @param streamApplication the application to run
    * @param appName the name of the application
    * @param overriddenConfigs configs to override
+   * @return RunApplicationContext which contains objects created within 
runApplication, to be used for verification
+   * if necessary
    */
-  public void runApplication(StreamApplication streamApplication, String 
appName, Map<String, String> overriddenConfigs) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put("job.factory.class", 
"org.apache.samza.job.local.ThreadJobFactory");
-    configs.put("job.name", appName);
-    configs.put("app.class", streamApplication.getClass().getCanonicalName());
-    configs.put("serializers.registry.json.class", 
"org.apache.samza.serializers.JsonSerdeFactory");
-    configs.put("serializers.registry.string.class", 
"org.apache.samza.serializers.StringSerdeFactory");
-    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
-    configs.put("systems.kafka.samza.key.serde", "string");
-    configs.put("systems.kafka.samza.msg.serde", "string");
-    configs.put("systems.kafka.samza.offset.default", "oldest");
-    configs.put("job.coordinator.system", "kafka");
-    configs.put("job.default.system", "kafka");
-    configs.put("job.coordinator.replication.factor", "1");
-    configs.put("task.window.ms", "1000");
-    configs.put("task.checkpoint.factory", 
TestStreamManager.MockCheckpointManagerFactory.class.getName());
+  protected RunApplicationContext runApplication(StreamApplication 
streamApplication,
+      String appName,
+      Map<String, String> overriddenConfigs) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("job.factory.class", 
"org.apache.samza.job.local.ThreadJobFactory");
+    configMap.put("job.name", appName);
+    configMap.put("app.class", 
streamApplication.getClass().getCanonicalName());
+    configMap.put("serializers.registry.json.class", 
"org.apache.samza.serializers.JsonSerdeFactory");
+    configMap.put("serializers.registry.string.class", 
"org.apache.samza.serializers.StringSerdeFactory");
+    configMap.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
+    configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configMap.put("systems.kafka.samza.key.serde", "string");
+    configMap.put("systems.kafka.samza.msg.serde", "string");
+    configMap.put("systems.kafka.samza.offset.default", "oldest");
+    configMap.put("job.coordinator.system", "kafka");
+    configMap.put("job.default.system", "kafka");
+    configMap.put("job.coordinator.replication.factor", "1");
+    configMap.put("task.window.ms", "1000");
+    configMap.put("task.checkpoint.factory", 
TestStreamManager.MockCheckpointManagerFactory.class.getName());
 
     // This is to prevent tests from taking a long time to stop after they're 
done. The issue is that
     // tearDown currently doesn't call runner.kill(app), and shuts down the 
Kafka and ZK servers immediately.
@@ -247,17 +250,18 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
     // changelog streams. Hence we just force an unclean shutdown here to. 
This _should be_ OK
     // since the test method has already executed by the time the shutdown 
hook is called. The side effect is
     // that buffered state (e.g. changelog contents) might not be flushed 
correctly after the test run.
-    configs.put("task.shutdown.ms", "1");
+    configMap.put("task.shutdown.ms", "1");
 
     if (overriddenConfigs != null) {
-      configs.putAll(overriddenConfigs);
+      configMap.putAll(overriddenConfigs);
     }
 
-    app = streamApplication;
-    runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(new 
MapConfig(configs));
+    Config config = new MapConfig(configMap);
+    AbstractApplicationRunner runner = (AbstractApplicationRunner) 
ApplicationRunner.fromConfig(config);
     runner.run(streamApplication);
 
     StreamAssert.waitForComplete();
+    return new RunApplicationContext(runner, config);
   }
 
   public void setNumEmptyPolls(int numEmptyPolls) {
@@ -274,4 +278,26 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
     consumer.close();
     super.tearDown();
   }
+
+  /**
+   * Container for any necessary context created during runApplication. Allows 
tests to access objects created within
+   * runApplication in order to do verification.
+   */
+  protected static class RunApplicationContext {
+    private final AbstractApplicationRunner runner;
+    private final Config config;
+
+    private RunApplicationContext(AbstractApplicationRunner runner, Config 
config) {
+      this.runner = runner;
+      this.config = config;
+    }
+
+    public AbstractApplicationRunner getRunner() {
+      return this.runner;
+    }
+
+    public Config getConfig() {
+      return this.config;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7fc39e52/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 5424888..a2adb70 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -107,7 +107,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, 
inputTopicName2);
     configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, 
outputTopicName);
 
-    runApplication(app, appName, configs);
+    RunApplicationContext runApplicationContext = runApplication(app, appName, 
configs);
 
     // consume and validate result
     List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(outputTopicName), 2);
@@ -138,8 +138,6 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
       }
       Assert.assertEquals(0, remainingMessageNum);
     }
-
-
   }
 
   @Test

Reply via email to