Repository: samza
Updated Branches:
  refs/heads/master 1a7e27097 -> e2adf8f99


http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java 
b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
deleted file mode 100644
index dc9fd27..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.e2e;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlRemoteTable {
-  @Test
-  public void testSinkEndToEndWithKey() {
-    int numMessages = 20;
-
-    RemoteStoreIOResolverTestFactory.records.clear();
-
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
-    String sql = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
-  }
-
-  @Test
-  public void testSinkEndToEndWithKeyWithNullRecords() {
-    int numMessages = 20;
-
-    RemoteStoreIOResolverTestFactory.records.clear();
-
-    Map<String, String> props = new HashMap<>();
-    Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages, 
false, true);
-
-    String sql = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals(numMessages - ((numMessages - 1) / 
TestAvroSystemFactory.NULL_RECORD_FREQUENCY + 1),
-        RemoteStoreIOResolverTestFactory.records.size());
-  }
-
-  @Test (expected = AssertionError.class)
-  public void testSinkEndToEndWithoutKey() {
-    int numMessages = 20;
-
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
-    String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) 
select id, name from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
-  }
-
-  @Test
-  public void testSourceEndToEndWithKey() {
-    int numMessages = 20;
-
-    TestAvroSystemFactory.messages.clear();
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    populateProfileTable(staticConfigs, numMessages);
-
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
-            + "       p.name as profileName, p.address as profileAddress "
-            + "from testRemoteStore.Profile.`$table` as p "
-            + "join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId";
-
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    List<String> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
-            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
-            ((GenericRecord) x.getMessage()).get("profileName").toString()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(numMessages, outMessages.size());
-    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
-    Assert.assertEquals(expectedOutMessages, outMessages);
-  }
-
-  @Test
-  public void testSourceEndToEndWithKeyWithNullForeignKeys() {
-    int numMessages = 20;
-
-    TestAvroSystemFactory.messages.clear();
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
-    populateProfileTable(staticConfigs, numMessages);
-
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
-            + "       p.name as profileName, p.address as profileAddress "
-            + "from testRemoteStore.Profile.`$table` as p "
-            + "join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId";
-
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    List<String> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
-            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
-            ((GenericRecord) x.getMessage()).get("profileName").toString()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(numMessages / 2, outMessages.size());
-    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages);
-    Assert.assertEquals(expectedOutMessages, outMessages);
-  }
-
-  @Test
-  public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() {
-    int numMessages = 20;
-
-    TestAvroSystemFactory.messages.clear();
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
-    populateProfileTable(staticConfigs, numMessages);
-
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
-            + "       p.name as profileName, p.address as profileAddress "
-            + "from testRemoteStore.Profile.`$table` as p "
-            + "right join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId";
-
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    List<String> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
-            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
-            ((GenericRecord) x.getMessage()).get("profileName").toString()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(numMessages, outMessages.size());
-    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
-    Assert.assertEquals(expectedOutMessages, outMessages);
-  }
-
-  @Test
-  public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
-    int numMessages = 21;
-
-    TestAvroSystemFactory.messages.clear();
-    RemoteStoreIOResolverTestFactory.records.clear();
-    Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
-    populateProfileTable(staticConfigs, numMessages);
-
-    // The below query reads messages from a stream and deletes the 
corresponding records from the table.
-    // Since the stream has alternate messages with null foreign key, only 
half of the messages will have
-    // successful joins and hence only half of the records in the table will 
be deleted. Although join is
-    // redundant here, keeping it just for testing purpose.
-    String sql =
-        "Insert into testRemoteStore.Profile.`$table` "
-            + "select p.__key__ as __key__ "
-            + "from testRemoteStore.Profile.`$table` as p "
-            + "join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId ";
-
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals((numMessages + 1) / 2, 
RemoteStoreIOResolverTestFactory.records.size());
-  }
-
-  private void populateProfileTable(Map<String, String> staticConfigs, int 
numMessages) {
-    RemoteStoreIOResolverTestFactory.records.clear();
-
-    String sql = "Insert into testRemoteStore.Profile.`$table` select * from 
testavro.PROFILE";
-    List<String> sqlStmts = Arrays.asList(sql);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index cada93d..e1ca2e6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.RandomStringUtils;
@@ -42,6 +43,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.KV;
 import org.apache.samza.runtime.LocalApplicationRunner;
@@ -264,8 +266,9 @@ public class TestRunner {
     Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), 
"Timeouts should be positive");
     // Cleaning store directories to ensure current run does not pick up state 
from previous run
     deleteStoreDirectories();
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, new 
MapConfig(configs));
-    runner.run();
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
+    runner.run(buildExternalContext(config).orElse(null));
     if (!runner.waitForFinish(timeout)) {
       throw new SamzaException("Timed out waiting for application to finish");
     }
@@ -410,4 +413,13 @@ public class TestRunner {
     this.configs.put(keySerdeConfigKey, null);
     this.configs.put(msgSerdeConfigKey, null);
   }
+
+  private static Optional<ExternalContext> buildExternalContext(Config config) 
{
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
+     * a non-empty ExternalContext. Only config should be used to build the 
external context. In the future, components
+     * like the application descriptor may not be available.
+     */
+    return Optional.empty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
index 34b264f..ed73725 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
@@ -19,10 +19,12 @@
 
 package org.apache.samza.test.integration;
 
+import java.util.Optional;
 import joptsimple.OptionSet;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.runtime.ApplicationRunnerMain;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
@@ -51,10 +53,19 @@ public class LocalApplicationRunnerMain {
 
     try {
       LOGGER.info("Launching stream application: {} to run.", app);
-      runner.run();
+      runner.run(buildExternalContext(config).orElse(null));
       runner.waitForFinish();
     } catch (Exception e) {
       LOGGER.error("Exception occurred when running application: {}.", app, e);
     }
   }
+
+  private static Optional<ExternalContext> buildExternalContext(Config config) 
{
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
+     * a non-empty ExternalContext. Only config should be used to build the 
external context. In the future, components
+     * like the application descriptor may not be available.
+     */
+    return Optional.empty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 6f381e2..45b5668 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
@@ -109,9 +110,11 @@ public class EndOfStreamIntegrationTest extends 
AbstractIntegrationTestHarness {
       }
     }
 
-    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new PipelineApplication(), new 
MapConfig(configs));
+    Config config = new MapConfig(configs);
+    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new PipelineApplication(),
+        config);
 
-    runner.run();
+    executeRun(runner, config);
     runner.waitForFinish();
 
     assertEquals(received.size(), count * partitionCount);

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 74c32b4..12b6f6d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -164,8 +164,9 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
       }
     }
 
-    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new TestStreamApp(), new 
MapConfig(configs));
-    runner.run();
+    Config config = new MapConfig(configs);
+    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new TestStreamApp(), config);
+    executeRun(runner, config);
 
     // processors are only available when the app is running
     Map<String, StreamOperatorTask> tasks = 
getTaskOperationGraphs((MockLocalApplicationRunner) runner);

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 0c3d755..16953f0 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -255,7 +255,7 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
 
     Config config = new MapConfig(configMap);
     ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(streamApplication, config);
-    runner.run();
+    executeRun(runner, config);
 
     MessageStreamAssert.waitForComplete();
     return new RunApplicationContext(runner, config);

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 6faf80b..e2b458e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -263,7 +263,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
       if (hasSecondProcessorJoined.compareAndSet(false, true)) {
         previousJobModelVersion[0] = zkUtils.getJobModelVersion();
         previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
-        appRunner2.run();
+        executeRun(appRunner2, localTestConfig2);
         try {
           // Wait for appRunner2 to register with zookeeper.
           secondProcessorRegistered.await();
@@ -279,7 +279,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
         TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic, null,
         callback, kafkaEventsConsumedLatch, localTestConfig1), 
localTestConfig1);
-    appRunner1.run();
+    executeRun(appRunner1, localTestConfig1);
 
     kafkaEventsConsumedLatch.await();
 
@@ -345,7 +345,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
       if (hasSecondProcessorJoined.compareAndSet(false, true)) {
         previousJobModelVersion[0] = zkUtils.getJobModelVersion();
         previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
-        appRunner2.run();
+        executeRun(appRunner2, testAppConfig2);
         try {
           // Wait for appRunner2 to register with zookeeper.
           secondProcessorRegistered.await();
@@ -363,7 +363,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
         TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, 
streamApplicationCallback,
         kafkaEventsConsumedLatch, testAppConfig1), testAppConfig1);
-    appRunner1.run();
+    executeRun(appRunner1, testAppConfig1);
 
     kafkaEventsConsumedLatch.await();
 
@@ -424,8 +424,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch3, null, kafkaEventsConsumedLatch,
         applicationConfig3), applicationConfig3);
 
-    appRunner1.run();
-    appRunner2.run();
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
 
     // Wait until all processors have processed a message.
     processedMessagesLatch1.await();
@@ -451,7 +451,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     kafkaEventsConsumedLatch.await();
     publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, 
PROCESSOR_IDS[0]);
 
-    appRunner3.run();
+    executeRun(appRunner3, applicationConfig3);
     processedMessagesLatch3.await();
 
     // Verifications after killing the leader.
@@ -489,8 +489,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         applicationConfig2), applicationConfig2);
 
     // Run stream applications.
-    appRunner1.run();
-    appRunner2.run();
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
 
     // Wait for message processing to run in both the processors.
     processedMessagesLatch1.await();
@@ -505,7 +505,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     // Fail when the duplicate processor joins.
     expectedException.expect(SamzaException.class);
     try {
-      appRunner3.run();
+      executeRun(appRunner3, applicationConfig2);
     } finally {
       appRunner1.kill();
       appRunner2.kill();
@@ -545,8 +545,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         applicationConfig2), applicationConfig2);
 
     // Run stream application.
-    appRunner1.run();
-    appRunner2.run();
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
 
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
@@ -571,7 +571,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     ApplicationRunner appRunner3 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
         TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch,
         applicationConfig1), applicationConfig1);
-    appRunner3.run();
+    executeRun(appRunner3, applicationConfig1);
 
     processedMessagesLatch1.await();
 
@@ -615,8 +615,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch,
         applicationConfig2), applicationConfig2);
 
-    appRunner1.run();
-    appRunner2.run();
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
 
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
@@ -636,7 +636,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     ApplicationRunner appRunner3 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
         TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch3, null, kafkaEventsConsumedLatch,
         applicationConfig3), applicationConfig3);
-    appRunner3.run();
+    executeRun(appRunner3, applicationConfig3);
 
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
@@ -674,7 +674,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
             TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch1,
             applicationConfig1), applicationConfig1);
 
-    appRunner1.run();
+    executeRun(appRunner1, applicationConfig1);
     processedMessagesLatch1.await();
 
     String jobModelVersion = zkUtils.getJobModelVersion();

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
new file mode 100644
index 0000000..475617c
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.samzasql;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+
+
+public class SamzaSqlIntegrationTestHarness extends 
AbstractIntegrationTestHarness {
+  protected void runApplication(Config config) {
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, 
config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index d593870..a009e41 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -35,14 +35,12 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.serializers.JsonSerdeV2Factory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.MyTestUdf;
 import org.apache.samza.sql.testutil.SampleRelConverterFactory;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -51,7 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
+public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
   private final Map<String, String> configs = new HashMap<>();
@@ -80,8 +78,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -104,8 +101,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -124,8 +120,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> x.getMessage() == null ? null : 
Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
@@ -149,8 +144,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql = "Insert into testavro2.SIMPLE1 select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -169,8 +163,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql2 = "Insert into testavro.SIMPLE3 select * from 
testavro.SIMPLE2";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -194,8 +187,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -215,8 +207,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql2 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -236,8 +227,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     String sql2 = "Insert into testavro.SIMPLE3 select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -258,8 +248,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
         + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) 
+ MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -282,8 +271,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -304,8 +292,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
         "Insert into testavro.outputTopic(id) select Flatten(a) as id from 
(select MyTestArray(id) a from testavro.SIMPLE1)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -326,8 +313,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
         + "select id, MyTest(id) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     LOG.info("output Messages " + TestAvroSystemFactory.messages);
 
@@ -354,8 +340,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
             + "where RegexMatch('.*4', name)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     LOG.info("output Messages " + TestAvroSystemFactory.messages);
     // There should be two messages that contain "4"
@@ -379,8 +364,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -409,8 +393,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -439,8 +422,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -469,8 +451,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> {
@@ -505,8 +486,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -538,8 +518,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -568,8 +547,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -598,8 +576,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -630,8 +607,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -661,8 +637,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -692,8 +667,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -726,8 +700,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    runApplication(new MapConfig(staticConfigs));
 
     // Let's capture the list of windows/counts per key.
     HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
new file mode 100644
index 0000000..439b8f7
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -0,0 +1,222 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.test.samzasql;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
+  @Test
+  public void testSinkEndToEndWithKey() {
+    int numMessages = 20;
+
+    RemoteStoreIOResolverTestFactory.records.clear();
+
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
+  }
+
+  @Test
+  public void testSinkEndToEndWithKeyWithNullRecords() {
+    int numMessages = 20;
+
+    RemoteStoreIOResolverTestFactory.records.clear();
+
+    Map<String, String> props = new HashMap<>();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages, 
false, true);
+
+    String sql = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    Assert.assertEquals(numMessages - ((numMessages - 1) / 
TestAvroSystemFactory.NULL_RECORD_FREQUENCY + 1),
+        RemoteStoreIOResolverTestFactory.records.size());
+  }
+
+  @Test (expected = AssertionError.class)
+  public void testSinkEndToEndWithoutKey() {
+    int numMessages = 20;
+
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) 
select id, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
+  }
+
+  @Test
+  public void testSourceEndToEndWithKey() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
+  public void testSourceEndToEndWithKeyWithNullForeignKeys() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages / 2, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
+  public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "right join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
+  public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
+    int numMessages = 21;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
+    populateProfileTable(staticConfigs, numMessages);
+
+    // The below query reads messages from a stream and deletes the 
corresponding records from the table.
+    // Since the stream has alternate messages with null foreign key, only 
half of the messages will have
+    // successful joins and hence only half of the records in the table will 
be deleted. Although join is
+    // redundant here, keeping it just for testing purpose.
+    String sql =
+        "Insert into testRemoteStore.Profile.`$table` "
+            + "select p.__key__ as __key__ "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId ";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    Assert.assertEquals((numMessages + 1) / 2, 
RemoteStoreIOResolverTestFactory.records.size());
+  }
+
+  private void populateProfileTable(Map<String, String> staticConfigs, int 
numMessages) {
+    RemoteStoreIOResolverTestFactory.records.clear();
+
+    String sql = "Insert into testRemoteStore.Profile.`$table` select * from 
testavro.PROFILE";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index e112804..b447493 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -27,7 +27,9 @@ import java.util.Map;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
@@ -78,7 +80,7 @@ import static org.junit.Assert.assertTrue;
 public class TestLocalTable extends AbstractIntegrationTestHarness {
 
   @Test
-  public void testSendTo() throws  Exception {
+  public void testSendTo() throws Exception {
 
     int count = 10;
     Profile[] profiles = TestTableData.generateProfiles(count);
@@ -104,8 +106,9 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
           .sendTo(table);
     };
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, new 
MapConfig(configs));
-    runner.run();
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
+    executeRun(runner, config);
     runner.waitForFinish();
 
     for (int i = 0; i < partitionCount; i++) {
@@ -115,48 +118,29 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     }
   }
 
-  static class TestStreamTableJoin {
+  static class StreamTableJoinApp implements StreamApplication {
     static List<PageView> received = new LinkedList<>();
     static List<EnrichedPageView> joined = new LinkedList<>();
-    final int count;
-    final int partitionCount;
-    final Map<String, String> configs;
-
-    TestStreamTableJoin(int count, int partitionCount, Map<String, String> 
configs) {
-      this.count = count;
-      this.partitionCount = partitionCount;
-      this.configs = configs;
-    }
 
-    void runTest() {
-      final StreamApplication app = appDesc -> {
-
-        Table<KV<Integer, Profile>> table = appDesc.getTable(
-            new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), 
new ProfileJsonSerde())));
-        DelegatingSystemDescriptor ksd = new 
DelegatingSystemDescriptor("test");
-        GenericInputDescriptor<Profile> profileISD = 
ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-        appDesc.getInputStream(profileISD)
-            .map(m -> new KV(m.getMemberId(), m))
-            .sendTo(table);
-
-        GenericInputDescriptor<PageView> pageViewISD = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-        appDesc.getInputStream(pageViewISD)
-            .map(pv -> {
-                received.add(pv);
-                return pv;
-              })
-            .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new 
NoOpSerde<>(), new NoOpSerde<>()), "p1")
-            .join(table, new PageViewToProfileJoinFunction())
-            .sink((m, collector, coordinator) -> joined.add(m));
-      };
-
-      final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
new MapConfig(configs));
-      runner.run();
-      runner.waitForFinish();
-
-      assertEquals(count * partitionCount, received.size());
-      assertEquals(count * partitionCount, joined.size());
-      assertTrue(joined.get(0) instanceof EnrichedPageView);
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
+      Table<KV<Integer, Profile>> table = appDesc.getTable(
+          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new 
ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> profileISD = 
ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+      appDesc.getInputStream(profileISD)
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(table);
+
+      GenericInputDescriptor<PageView> pageViewISD = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.getInputStream(pageViewISD)
+          .map(pv -> {
+              received.add(pv);
+              return pv;
+            })
+          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new 
NoOpSerde<>(), new NoOpSerde<>()), "p1")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sink((m, collector, coordinator) -> joined.add(m));
     }
   }
 
@@ -179,85 +163,66 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     configs.put("streams.Profile.source", 
Base64Serializer.serialize(profiles));
     configs.put("streams.Profile.partitionCount", 
String.valueOf(partitionCount));
 
-    TestStreamTableJoin joinTest = new TestStreamTableJoin(count, 
partitionCount, configs);
-    joinTest.runTest();
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
StreamTableJoinApp(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
+    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
+    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
   }
 
-  static class TestDualStreamTableJoin {
+  static class DualStreamTableJoinApp implements StreamApplication {
     static List<Profile> sentToProfileTable1 = new LinkedList<>();
     static List<Profile> sentToProfileTable2 = new LinkedList<>();
     static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
     static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
-    final int count;
-    final int partitionCount;
-    final Map<String, String> configs;
-
-    TestDualStreamTableJoin(int count, int partitionCount, Map<String, String> 
configs) {
-      this.count = count;
-      this.partitionCount = partitionCount;
-      this.configs = configs;
-    }
 
-    void runTest() {
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
       KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new 
IntegerSerde(), new ProfileJsonSerde());
       KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new 
IntegerSerde(), new PageViewJsonSerde());
 
       PageViewToProfileJoinFunction joinFn1 = new 
PageViewToProfileJoinFunction();
       PageViewToProfileJoinFunction joinFn2 = new 
PageViewToProfileJoinFunction();
 
-      final StreamApplication app = appDesc -> {
-
-        Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new 
InMemoryTableDescriptor("t1", profileKVSerde));
-
-        DelegatingSystemDescriptor ksd = new 
DelegatingSystemDescriptor("test");
-        GenericInputDescriptor<Profile> profileISD1 = 
ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
-        GenericInputDescriptor<Profile> profileISD2 = 
ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
-        MessageStream<Profile> profileStream1 = 
appDesc.getInputStream(profileISD1);
-        MessageStream<Profile> profileStream2 = 
appDesc.getInputStream(profileISD2);
-
-        profileStream1
-            .map(m -> {
-                sentToProfileTable1.add(m);
-                return new KV(m.getMemberId(), m);
-              })
-            .sendTo(profileTable);
-        profileStream2
-            .map(m -> {
-                sentToProfileTable2.add(m);
-                return new KV(m.getMemberId(), m);
-              })
-            .sendTo(profileTable);
-
-        GenericInputDescriptor<PageView> pageViewISD1 = 
ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
-        GenericInputDescriptor<PageView> pageViewISD2 = 
ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
-        MessageStream<PageView> pageViewStream1 = 
appDesc.getInputStream(pageViewISD1);
-        MessageStream<PageView> pageViewStream2 = 
appDesc.getInputStream(pageViewISD2);
-
-        pageViewStream1
-            .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
-            .join(profileTable, joinFn1)
-            .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
-
-        pageViewStream2
-            .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
-            .join(profileTable, joinFn2)
-            .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
-      };
-
-      final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
new MapConfig(configs));
-      runner.run();
-      runner.waitForFinish();
-
-      assertEquals(count * partitionCount, sentToProfileTable1.size());
-      assertEquals(count * partitionCount, sentToProfileTable2.size());
-
-      assertEquals(count * partitionCount, joinedPageViews1.size());
-      assertEquals(count * partitionCount, joinedPageViews2.size());
-      assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
-      assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
+      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new 
InMemoryTableDescriptor("t1", profileKVSerde));
 
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> profileISD1 = 
ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
+      GenericInputDescriptor<Profile> profileISD2 = 
ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+      MessageStream<Profile> profileStream1 = 
appDesc.getInputStream(profileISD1);
+      MessageStream<Profile> profileStream2 = 
appDesc.getInputStream(profileISD2);
+
+      profileStream1
+          .map(m -> {
+              sentToProfileTable1.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+      profileStream2
+          .map(m -> {
+              sentToProfileTable2.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+
+      GenericInputDescriptor<PageView> pageViewISD1 = 
ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
+      GenericInputDescriptor<PageView> pageViewISD2 = 
ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+      MessageStream<PageView> pageViewStream1 = 
appDesc.getInputStream(pageViewISD1);
+      MessageStream<PageView> pageViewStream2 = 
appDesc.getInputStream(pageViewISD2);
+
+      pageViewStream1
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+          .join(profileTable, joinFn1)
+          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+      pageViewStream2
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+          .join(profileTable, joinFn2)
+          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
     }
-
   }
 
   @Test
@@ -288,8 +253,18 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     configs.put("streams.PageView2.source", 
Base64Serializer.serialize(pageViews));
     configs.put("streams.PageView2.partitionCount", 
String.valueOf(partitionCount));
 
-    TestDualStreamTableJoin dualJoinTest = new TestDualStreamTableJoin(count, 
partitionCount, configs);
-    dualJoinTest.runTest();
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
DualStreamTableJoinApp(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, 
DualStreamTableJoinApp.sentToProfileTable1.size());
+    assertEquals(count * partitionCount, 
DualStreamTableJoinApp.sentToProfileTable2.size());
+
+    assertEquals(count * partitionCount, 
DualStreamTableJoinApp.joinedPageViews1.size());
+    assertEquals(count * partitionCount, 
DualStreamTableJoinApp.joinedPageViews2.size());
+    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof 
EnrichedPageView);
+    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof 
EnrichedPageView);
   }
 
   static Map<String, String> getBaseJobConfig(String bootstrapUrl, String 
zkConnect) {
@@ -351,8 +326,9 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     configs.put("streams.PageView.partitionCount", String.valueOf(4));
     configs.put("task.inputs", "test.PageView");
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MyTaskApplication(), new MapConfig(configs));
-    runner.run();
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MyTaskApplication(), config);
+    executeRun(runner, config);
     runner.waitForFinish();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index c9228af..3de8300 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
@@ -223,8 +224,9 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
           .sendTo(outputTable);
     };
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, new 
MapConfig(configs));
-    runner.run();
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
+    executeRun(runner, config);
     runner.waitForFinish();
 
     int numExpected = count * partitionCount;

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
index 2f18875..1340ca1 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
@@ -22,7 +22,9 @@ import java.util.Properties
 
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.samza.config.{JobConfig, KafkaConsumerConfig, MapConfig}
+import org.apache.samza.config.{Config, JobConfig, KafkaConsumerConfig, 
MapConfig}
+import org.apache.samza.context.ExternalContext
+import org.apache.samza.runtime.ApplicationRunner
 import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaSystemConsumer}
 
 /**
@@ -83,4 +85,16 @@ abstract class AbstractIntegrationTestHarness extends 
AbstractKafkaServerTestHar
     new KafkaSystemAdmin(system, new MapConfig(map), 
KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig));
   }
 
+  protected def executeRun(applicationRunner: ApplicationRunner, config: 
Config): Unit = {
+    applicationRunner.run(buildExternalContext(config).orNull)
+  }
+
+  private def buildExternalContext(config: Config): Option[ExternalContext] = {
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
+     * a non-empty ExternalContext. Only config should be used to build the 
external context. In the future, components
+     * like the application descriptor may not be available.
+     */
+    None
+  }
 }
\ No newline at end of file

Reply via email to