This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new bad9ee8  [SAMZA-2666] Updating tests to use TestRunner framework 
(#1510)
bad9ee8 is described below

commit bad9ee81bc065f6dffd55829335434c5d6dfe010
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Aug 5 14:24:17 2021 -0700

    [SAMZA-2666] Updating tests to use TestRunner framework (#1510)
---
 .../EndOfStreamIntegrationTest.java                |  94 +++++-------------
 .../samza/test/operator/RepartitionWindowApp.java  |  65 ------------
 .../samza/test/operator/TestAsyncFlatMap.java      |  94 +++++++++++++-----
 .../test/operator/TestRepartitionWindowApp.java    | 110 ++++++++++++---------
 .../table/TestCouchbaseRemoteTableEndToEnd.java    |  56 +++++------
 .../TestLocalTableWithConfigRewriterEndToEnd.java  |  37 ++++---
 .../TestLocalTableWithLowLevelApiEndToEnd.java     |  37 +++----
 .../samza/test/table/TestRemoteTableEndToEnd.java  |  64 ++++++------
 .../table/TestRemoteTableWithBatchEndToEnd.java    |  64 +++++-------
 .../org/apache/samza/test/table/TestTableData.java |   4 +-
 10 files changed, 269 insertions(+), 356 deletions(-)

diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index e53f701..e5b8673 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -19,33 +19,22 @@
 
 package org.apache.samza.test.controlmessages;
 
+import java.time.Duration;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.test.controlmessages.TestData.PageView;
-import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
+import org.apache.samza.test.table.TestTableData;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -54,49 +43,12 @@ import static org.junit.Assert.assertEquals;
  * This test uses an array as a bounded input source, and does a partitionBy() 
and sink() after reading the input.
  * It verifies the pipeline will stop and the number of output messages should 
equal to the input.
  */
-public class EndOfStreamIntegrationTest extends IntegrationTestHarness {
-
-  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", 
"group", "job"};
-
-  private static List<PageView> received = new ArrayList<>();
+public class EndOfStreamIntegrationTest {
+  private static final List<PageView> RECEIVED = new ArrayList<>();
 
   @Test
-  public void testPipeline() throws  Exception {
-    Random random = new Random();
-    int count = 10;
-    PageView[] pageviews = new PageView[count];
-    for (int i = 0; i < count; i++) {
-      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
-      int memberId = random.nextInt(10);
-      pageviews[i] = new PageView(pagekey, memberId);
-    }
-
-    int partitionCount = 4;
-    Map<String, String> configs = new HashMap<>();
-    configs.put(ApplicationConfig.APP_RUNNER_CLASS, 
"org.apache.samza.runtime.LocalApplicationRunner");
-    configs.put("systems.test.samza.factory", 
ArraySystemFactory.class.getName());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", 
Base64Serializer.serialize(pageviews));
-    configs.put("streams.PageView.partitionCount", 
String.valueOf(partitionCount));
-
-    configs.put(JobConfig.JOB_NAME, "test-eos-job");
-    configs.put(JobConfig.PROCESSOR_ID, "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
-    configs.put(TaskConfig.GROUPER_FACTORY, 
SingleContainerGrouperFactory.class.getName());
-
-    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
-    configs.put("systems.kafka.samza.key.serde", "int");
-    configs.put("systems.kafka.samza.msg.serde", "json");
-    configs.put("systems.kafka.default.stream.replication.factor", "1");
-    configs.put("job.default.system", "kafka");
-
-    configs.put("serializers.registry.int.class", 
"org.apache.samza.serializers.IntegerSerdeFactory");
-    configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
-
+  public void testPipeline() {
     class PipelineApplication implements StreamApplication {
-
       @Override
       public void describe(StreamApplicationDescriptor appDescriptor) {
         DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
@@ -104,20 +56,22 @@ public class EndOfStreamIntegrationTest extends 
IntegrationTestHarness {
             sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), 
new NoOpSerde<>()));
         appDescriptor.getInputStream(isd)
             .map(KV::getValue)
-            .partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new 
NoOpSerde<>(), new NoOpSerde<>()), "p1")
+            .partitionBy(PageView::getMemberId, pv -> pv,
+                KVSerde.of(new IntegerSerde(), new 
TestTableData.PageViewJsonSerde()), "p1")
             .sink((m, collector, coordinator) -> {
-              received.add(m.getValue());
+              RECEIVED.add(m.getValue());
             });
       }
     }
 
-    Config config = new MapConfig(configs);
-    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new PipelineApplication(),
-        config);
-
-    executeRun(runner, config);
-    runner.waitForFinish();
+    int numPageViews = 40;
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+    InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+        .getInputDescriptor("PageView", new NoOpSerde<>());
+    TestRunner.of(new PipelineApplication())
+        .addInputStream(inputDescriptor, 
TestTableData.generatePartitionedPageViews(numPageViews, 4))
+        .run(Duration.ofSeconds(10));
 
-    assertEquals(received.size(), count * partitionCount);
+    assertEquals(RECEIVED.size(), numPageViews);
   }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
deleted file mode 100644
index fe8e318..0000000
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.operator;
-
-import java.time.Duration;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
-import org.apache.samza.test.operator.data.PageView;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link StreamApplication} that demonstrates a repartition followed by a 
windowed count.
- */
-public class RepartitionWindowApp implements StreamApplication {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(RepartitionWindowApp.class);
-  static final String SYSTEM = "kafka";
-  static final String INPUT_TOPIC = "page-views";
-  static final String OUTPUT_TOPIC = "Result";
-
-
-  @Override
-  public void describe(StreamApplicationDescriptor appDescriptor) {
-    KVSerde<String, PageView> inputSerde = KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
-    KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new 
StringSerde());
-    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
-    KafkaInputDescriptor<KV<String, PageView>> id = 
ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
-    KafkaOutputDescriptor<KV<String, String>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
-
-    appDescriptor.getInputStream(id)
-        .map(KV::getValue)
-        .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
-        .window(Windows.keyedSessionWindow(m -> m.getKey(), 
Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new 
IntegerSerde()), "w1")
-        .map(wp -> KV.of(wp.getKey().getKey().toString(), 
String.valueOf(wp.getMessage())))
-        .sendTo(appDescriptor.getOutputStream(od));
-
-  }
-}
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
index a60aea7..d5a5e7e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.test.operator;
 
-import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Collection;
@@ -31,12 +30,12 @@ import java.util.concurrent.CompletionStage;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.serializers.NoOpSerde;
@@ -46,14 +45,15 @@ import org.apache.samza.test.framework.TestRunner;
 import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
 import 
org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
 import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.IntegrationTestHarness;
 import org.apache.samza.test.operator.data.PageView;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
-public class TestAsyncFlatMap extends IntegrationTestHarness {
+public class TestAsyncFlatMap {
   private static final String TEST_SYSTEM = "test";
   private static final String PAGE_VIEW_STREAM = "test-async-page-view-stream";
   private static final String NON_GUEST_PAGE_VIEW_STREAM = 
"test-async-non-guest-page-view-stream";
@@ -68,66 +68,97 @@ public class TestAsyncFlatMap extends 
IntegrationTestHarness {
       new PageView("3", "profile-page", "0"),
       new PageView("4", LOGIN_PAGE, "0"));
 
-
   @Test
   public void testProcessingFutureCompletesSuccessfully() {
     List<PageView> expectedPageViews = PAGE_VIEWS.stream()
-        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && 
Long.valueOf(pageView.getUserId()) > 0)
+        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && 
Long.parseLong(pageView.getUserId()) > 0)
         .collect(Collectors.toList());
 
-    List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+    List<PageView> actualPageViews = runTest(new HashMap<>());
     assertEquals("Mismatch between expected vs actual page views", 
expectedPageViews, actualPageViews);
   }
 
-  @Test(expected = SamzaException.class)
+  @Test
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
     configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a task callback timeout");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The timeout message is
+       * nested within a bunch of other exceptions.
+       */
+      Throwable rootCause = findRootCause(e);
+      assertTrue(rootCause instanceof SamzaException);
+      // the "{}" is intentional, since the exception message actually 
includes it (probably a logging bug)
+      assertEquals("Callback for task {} Partition 0 timed out after 100 ms.", 
rootCause.getMessage());
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testProcessingExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_PROCESS, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a 
ProcessFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof ProcessFailureException);
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testDownstreamOperatorExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a FilterFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the 
actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof FilterFailureException);
+    }
   }
 
-  private List<PageView> runTest(List<PageView> pageViews, Map<String, String> 
configs) {
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, 
PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+  private List<PageView> runTest(Map<String, String> configs) {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
         .getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
     InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
         .getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
 
     TestRunner
         .of(new AsyncFlatMapExample())
-        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
         .addOutputStream(outputStreamDesc, 1)
         .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(50000));
+        .run(Duration.ofSeconds(10));
 
     Map<Integer, List<PageView>> result = 
TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-    List<PageView> results = result.values().stream()
+    return result.values().stream()
         .flatMap(List::stream)
         .collect(Collectors.toList());
+  }
 
-    return results;
+  private static Throwable findRootCause(Throwable e) {
+    Throwable currentException = e;
+    while (currentException.getCause() != null) {
+      currentException = currentException.getCause();
+    }
+    return currentException;
   }
 
   static class AsyncFlatMapExample implements StreamApplication {
@@ -158,11 +189,11 @@ public class TestAsyncFlatMap extends 
IntegrationTestHarness {
           System.out.println("Interrupted during sleep.");
         }
 
-        return Long.valueOf(pageView.getUserId()) < 1 ? 
Collections.emptyList() : Collections.singleton(pageView);
+        return Long.parseLong(pageView.getUserId()) < 1 ? 
Collections.emptyList() : Collections.singleton(pageView);
       });
 
       if (shouldFailProcess.test(pageView)) {
-        filteredPageViews.completeExceptionally(new RuntimeException("Remote 
service threw an exception"));
+        filteredPageViews.completeExceptionally(new 
ProcessFailureException("Remote service threw an exception"));
       }
 
       return filteredPageViews;
@@ -170,11 +201,22 @@ public class TestAsyncFlatMap extends 
IntegrationTestHarness {
 
     private static boolean filterLoginPageViews(PageView pageView, 
Predicate<PageView> shouldFailProcess) {
       if (shouldFailProcess.test(pageView)) {
-        throw new RuntimeException("Filtering login page views ran into an 
exception");
+        throw new FilterFailureException("Filtering login page views ran into 
an exception");
       }
 
       return !LOGIN_PAGE.equals(pageView.getPageId());
     }
+  }
 
+  private static class ProcessFailureException extends RuntimeException {
+    public ProcessFailureException(String s) {
+      super(s);
+    }
+  }
+
+  private static class FilterFailureException extends RuntimeException {
+    public FilterFailureException(String s) {
+      super(s);
+    }
   }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 77ada4a..7add331 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -18,69 +18,87 @@
  */
 package org.apache.samza.test.operator;
 
-import java.util.Collections;
+import java.time.Duration;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
+import com.google.common.collect.ImmutableList;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
+import org.apache.samza.test.framework.StreamAssert;
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.samza.test.operator.RepartitionWindowApp.*;
 
 /**
  * Test driver for {@link RepartitionWindowApp}.
  */
-public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHarness {
-  private static final String APP_NAME = "PageViewCounterApp";
+public class TestRepartitionWindowApp {
+  private static final String SYSTEM = "test";
+  private static final String INPUT_TOPIC = "page-views";
+  private static final String OUTPUT_TOPIC = "Result";
 
   @Test
   public void testRepartitionedSessionWindowCounter() throws Exception {
-    // create topics
-    createTopic(INPUT_TOPIC, 3);
-    createTopic(OUTPUT_TOPIC, 1);
+    Map<Integer, List<KV<String, PageView>>> pageViews = new HashMap<>();
+    pageViews.put(0, ImmutableList.of(KV.of("userId1", new PageView("india", 
"5.com", "userId1")),
+        KV.of("userId1", new PageView("india", "2.com", "userId1"))));
+    pageViews.put(1, ImmutableList.of(KV.of("userId2", new PageView("china", 
"4.com", "userId2")),
+        KV.of("userId1", new PageView("india", "3.com", "userId1"))));
+    pageViews.put(2, ImmutableList.of(KV.of("userId1", new PageView("india", 
"1.com", "userId1"))));
 
-    // produce messages to different partitions.
-    ObjectMapper mapper = new ObjectMapper();
-    PageView pv = new PageView("india", "5.com", "userId1");
-    produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
-    pv = new PageView("china", "4.com", "userId2");
-    produceMessage(INPUT_TOPIC, 1, "userId2", mapper.writeValueAsString(pv));
-    pv = new PageView("india", "1.com", "userId1");
-    produceMessage(INPUT_TOPIC, 2, "userId1", mapper.writeValueAsString(pv));
-    pv = new PageView("india", "2.com", "userId1");
-    produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
-    pv = new PageView("india", "3.com", "userId1");
-    produceMessage(INPUT_TOPIC, 1, "userId1", mapper.writeValueAsString(pv));
+    InMemorySystemDescriptor sd = new InMemorySystemDescriptor(SYSTEM);
+    InMemoryInputDescriptor<KV<String, PageView>> inputDescriptor =
+        sd.getInputDescriptor(INPUT_TOPIC, KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    /*
+     * Technically, this should have a message type of KV, because a KV is 
passed to sendTo, but
+     * StreamAssert.containsInAnyOrder requires the type to match the output 
type of the actual messages. In
+     * high-level, sendTo splits up the KV, so the actual messages are just 
the "V" part of the KV.
+     * TestRunner only uses NoOpSerde anyways, so it doesn't matter if the 
typing isn't KV.
+     */
+    InMemoryOutputDescriptor<String> outputDescriptor = 
sd.getOutputDescriptor(OUTPUT_TOPIC, new NoOpSerde<>());
+    TestRunner.of(new RepartitionWindowApp())
+        .addInputStream(inputDescriptor, pageViews)
+        .addOutputStream(outputDescriptor, 1)
+        .addConfig("task.window.ms", "1000")
+        .run(Duration.ofSeconds(10));
 
-    Map<String, String> configs = new HashMap<>();
-    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-    configs.put(JobConfig.PROCESSOR_ID, "0");
-    configs.put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-
-    // run the application
-    runApplication(new RepartitionWindowApp(), APP_NAME, configs);
+    StreamAssert.containsInAnyOrder(Arrays.asList("userId1 4", "userId2 1"), 
outputDescriptor,
+        Duration.ofSeconds(1));
+  }
 
-    // consume and validate result
-    List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
-    Assert.assertEquals(messages.size(), 2);
+  /**
+   * A {@link StreamApplication} that demonstrates a repartition followed by a 
windowed count.
+   */
+  private static class RepartitionWindowApp implements StreamApplication {
+    @Override
+    public void describe(StreamApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM);
+      GenericInputDescriptor<KV<String, PageView>> inputDescriptor =
+          sd.getInputDescriptor(INPUT_TOPIC, KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+      GenericOutputDescriptor<KV<String, String>> outputDescriptor =
+          sd.getOutputDescriptor(OUTPUT_TOPIC, KVSerde.of(new NoOpSerde<>(), 
new NoOpSerde<>()));
 
-    for (ConsumerRecord<String, String> message : messages) {
-      String key = message.key();
-      String value = message.value();
-      // Assert that there are 4 messages for userId1 and 1 message for 
userId2.
-      Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
-      if ("userId1".equals(key)) {
-        Assert.assertEquals(value, "4");
-      } else {
-        Assert.assertEquals(value, "1");
-      }
+      appDescriptor.getInputStream(inputDescriptor)
+          .map(KV::getValue)
+          .partitionBy(PageView::getUserId, m -> m, KVSerde.of(new 
NoOpSerde<>(), new NoOpSerde<>()), "p1")
+          .window(Windows.keyedSessionWindow(KV::getKey, 
Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new 
IntegerSerde()), "w1")
+          .map(wp -> KV.of(wp.getKey().getKey(), wp.getKey().getKey() + " " + 
wp.getMessage()))
+          .sendTo(appDescriptor.getOutputStream(outputDescriptor));
     }
   }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
index 834c356..1d52bdc 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestCouchbaseRemoteTableEndToEnd.java
@@ -29,16 +29,14 @@ import 
com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
 import com.couchbase.mock.BucketConfiguration;
 import com.couchbase.mock.CouchbaseMock;
 
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -48,8 +46,9 @@ import 
org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.remote.NoOpTableReadFunction;
 import org.apache.samza.table.remote.couchbase.CouchbaseTableReadFunction;
 import org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -64,14 +63,14 @@ import org.junit.Test;
  * CouchbaseMock library, closing a mocked bucket would throw exceptions like: 
java.lang.ArithmeticException: / by zero.
  * Please ignore those exception.
  */
-public class TestCouchbaseRemoteTableEndToEnd extends IntegrationTestHarness {
-  protected CouchbaseEnvironment couchbaseEnvironment;
-  protected CouchbaseMock couchbaseMock;
-  protected Cluster cluster;
-  protected String inputBucketName = "inputBucket";
-  protected String outputBucketName = "outputBucket";
-
-  protected void createMockBuckets(List<String> bucketNames) throws Exception {
+public class TestCouchbaseRemoteTableEndToEnd {
+  private CouchbaseEnvironment couchbaseEnvironment;
+  private CouchbaseMock couchbaseMock;
+  private Cluster cluster;
+  private String inputBucketName = "inputBucket";
+  private String outputBucketName = "outputBucket";
+
+  private void createMockBuckets(List<String> bucketNames) throws Exception {
     ArrayList<BucketConfiguration> configList = new ArrayList<>();
     bucketNames.forEach(name -> configList.add(configBucket(name)));
     couchbaseMock = new CouchbaseMock(0, configList);
@@ -79,7 +78,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends 
IntegrationTestHarness {
     couchbaseMock.waitForStartup();
   }
 
-  protected BucketConfiguration configBucket(String bucketName) {
+  private BucketConfiguration configBucket(String bucketName) {
     BucketConfiguration config = new BucketConfiguration();
     config.numNodes = 1;
     config.numReplicas = 1;
@@ -87,7 +86,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends 
IntegrationTestHarness {
     return config;
   }
 
-  protected void initClient() {
+  private void initClient() {
     couchbaseEnvironment = DefaultCouchbaseEnvironment.builder()
         
.bootstrapCarrierDirectPort(couchbaseMock.getCarrierPort("inputBucket"))
         .bootstrapHttpDirectPort(couchbaseMock.getHttpPort())
@@ -110,8 +109,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends 
IntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEnd() throws Exception {
-
+  public void testEndToEnd() {
     Bucket inputBucket = cluster.openBucket(inputBucketName);
     inputBucket.upsert(ByteArrayDocument.create("Alice", "20".getBytes()));
     inputBucket.upsert(ByteArrayDocument.create("Bob", "30".getBytes()));
@@ -119,15 +117,7 @@ public class TestCouchbaseRemoteTableEndToEnd extends 
IntegrationTestHarness {
     inputBucket.upsert(ByteArrayDocument.create("David", "50".getBytes()));
     inputBucket.close();
 
-    String[] users = new String[]{"Alice", "Bob", "Chris", "David"};
-
-    int partitionCount = 1;
-    Map<String, String> configs = 
TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.User.samza.system", "test");
-    configs.put("streams.User.source", Base64Serializer.serialize(users));
-    configs.put("streams.User.partitionCount", String.valueOf(partitionCount));
-    Config config = new MapConfig(configs);
+    List<String> users = Arrays.asList("Alice", "Bob", "Chris", "David");
 
     final StreamApplication app = appDesc -> {
       DelegatingSystemDescriptor inputSystemDescriptor = new 
DelegatingSystemDescriptor("test");
@@ -162,9 +152,12 @@ public class TestCouchbaseRemoteTableEndToEnd extends 
IntegrationTestHarness {
           .sendTo(outputTable);
     };
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
-    executeRun(runner, config);
-    runner.waitForFinish();
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+    InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+        .getInputDescriptor("User", new NoOpSerde<>());
+    TestRunner.of(app)
+        .addInputStream(inputDescriptor, users)
+        .run(Duration.ofSeconds(10));
 
     Bucket outputBucket = cluster.openBucket(outputBucketName);
     Assert.assertEquals("{\"name\":\"Alice\",\"age\":\"20\"}", 
outputBucket.get("Alice").content().toString());
@@ -192,5 +185,4 @@ public class TestCouchbaseRemoteTableEndToEnd extends 
IntegrationTestHarness {
       return record.getKey();
     }
   }
-
-}
+}
\ No newline at end of file
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
index c84945d..6c1f38c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.table;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,6 @@ import 
org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigRewriter;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -39,31 +39,30 @@ import 
org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.table.TableConfigGenerator;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 
 import org.junit.Test;
 
-import static 
org.apache.samza.test.table.TestLocalTableEndToEnd.getBaseJobConfig;
 import static 
org.apache.samza.test.table.TestLocalTableWithLowLevelApiEndToEnd.MyStreamTask;
 
 
-public class TestLocalTableWithConfigRewriterEndToEnd extends 
IntegrationTestHarness {
-
+public class TestLocalTableWithConfigRewriterEndToEnd {
+  /**
+   * MyTaskApplication does not include table descriptor, so if the rewriter 
does not add the table configs properly,
+   * then the application will fail to execute.
+   */
   @Test
-  public void testWithConfigRewriter() throws Exception {
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), 
zkConnect());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", 
Base64Serializer.serialize(TestTableData.generatePageViews(10)));
-    configs.put("streams.PageView.partitionCount", String.valueOf(4));
-    configs.put("task.inputs", "test.PageView");
-    configs.put("job.config.rewriter.my-rewriter.class", 
MyConfigRewriter.class.getName());
-    configs.put("job.config.rewriters", "my-rewriter");
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MyTaskApplication(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
+  public void testWithConfigRewriter() {
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+    InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+        .getInputDescriptor("PageView", new NoOpSerde<>());
+    TestRunner.of(new MyTaskApplication())
+        .addInputStream(inputDescriptor, 
TestTableData.generatePartitionedPageViews(40, 4))
+        .addConfig("job.config.rewriters", "my-rewriter")
+        .addConfig("job.config.rewriter.my-rewriter.class", 
MyConfigRewriter.class.getName())
+        .run(Duration.ofSeconds(10));
   }
 
   static public class MyConfigRewriter implements ConfigRewriter {
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
index 164e74b..734b0ba 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
@@ -19,14 +19,10 @@
 
 package org.apache.samza.test.table;
 
-import java.util.Map;
-
+import java.time.Duration;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -40,29 +36,24 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.harness.IntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
-
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static 
org.apache.samza.test.table.TestLocalTableEndToEnd.getBaseJobConfig;
-
-
-public class TestLocalTableWithLowLevelApiEndToEnd extends 
IntegrationTestHarness {
 
+public class TestLocalTableWithLowLevelApiEndToEnd {
   @Test
-  public void testTableWithLowLevelApi() throws Exception {
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), 
zkConnect());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", 
Base64Serializer.serialize(TestTableData.generatePageViews(10)));
-    configs.put("streams.PageView.partitionCount", String.valueOf(4));
-    configs.put("task.inputs", "test.PageView");
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MyTaskApplication(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
+  public void testTableWithLowLevelApi() {
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+    InMemoryInputDescriptor<TestTableData.PageView> inputDescriptor = isd
+        .getInputDescriptor("PageView", new NoOpSerde<>());
+    TestRunner.of(new 
TestLocalTableWithConfigRewriterEndToEnd.MyTaskApplication())
+        .addInputStream(inputDescriptor, 
TestTableData.generatePartitionedPageViews(40, 4))
+        .addConfig("job.config.rewriters", "my-rewriter")
+        .addConfig("job.config.rewriter.my-rewriter.class", 
TestLocalTableWithConfigRewriterEndToEnd.MyConfigRewriter.class.getName())
+        .run(Duration.ofSeconds(10));
   }
 
   static public class MyTaskApplication implements TaskApplication {
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
index 2b89ba9..6f17a10 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
@@ -39,7 +39,6 @@ import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
@@ -53,7 +52,6 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
@@ -65,7 +63,9 @@ import 
org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
 
@@ -75,7 +75,6 @@ import org.junit.Test;
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
 import static org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.generatePageViews;
 import static org.apache.samza.test.table.TestTableData.generateProfiles;
 
 import static org.mockito.Matchers.any;
@@ -83,10 +82,9 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
 
 
-public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
-
-  static Map<String, AtomicInteger> counters = new HashMap<>();
-  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
+public class TestRemoteTableEndToEnd {
+  private static final Map<String, AtomicInteger> COUNTERS = new HashMap<>();
+  private static final Map<String, List<EnrichedPageView>> WRITTEN_RECORDS = 
new HashMap<>();
 
   static class InMemoryProfileReadFunction extends BaseTableFunction
       implements TableReadFunction<Integer, Profile> {
@@ -144,7 +142,7 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
       in.defaultReadObject();
 
       // Write to the global list for verification
-      records = writtenRecords.get(testName);
+      records = WRITTEN_RECORDS.get(testName);
     }
 
     @Override
@@ -189,7 +187,7 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
     public CompletableFuture readAsync(int opId, Object... args) {
       if (1 == opId) {
         boolean shouldReturnValue = (boolean) args[0];
-        AtomicInteger counter = counters.get(testName);
+        AtomicInteger counter = COUNTERS.get(testName);
         Integer result = shouldReturnValue ? counter.get() : null;
         return CompletableFuture.completedFuture(result);
       } else {
@@ -230,7 +228,7 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
     @Override
     public CompletableFuture writeAsync(int opId, Object... args) {
       Integer result;
-      AtomicInteger counter = counters.get(testName);
+      AtomicInteger counter = COUNTERS.get(testName);
       boolean shouldModify = (boolean) args[0];
       switch (opId) {
         case 1:
@@ -251,7 +249,7 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
     }
   }
 
-  static private class TestReadWriteMapFunction implements 
MapFunction<PageView, PageView> {
+  private static class TestReadWriteMapFunction implements 
MapFunction<PageView, PageView> {
 
     private final String counterTableName;
     private ReadWriteTable counterTable;
@@ -323,20 +321,12 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
     return appDesc.getTable(cachingDesc);
   }
 
-  private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean 
defaultCache, boolean withArgs, String testName)
-      throws Exception {
-
-    writtenRecords.put(testName, new ArrayList<>());
-
-    int count = 10;
-    final PageView[] pageViews = generatePageViews(count);
-    final String profiles = 
Base64Serializer.serialize(generateProfiles(count));
+  private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean 
defaultCache, boolean withArgs,
+      String testName) throws Exception {
+    WRITTEN_RECORDS.put(testName, new ArrayList<>());
 
-    final int partitionCount = 4;
-    final Map<String, String> configs = 
TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", 
Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", 
String.valueOf(partitionCount));
+    // max member id for page views is 10
+    final String profiles = Base64Serializer.serialize(generateProfiles(10));
 
     final RateLimiter readRateLimiter = mock(RateLimiter.class, 
withSettings().serializable());
     final TableRateLimiter.CreditFunction creditFunction = (k, v, args) -> 1;
@@ -373,7 +363,7 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
             .sendTo(outputTable);
 
       } else {
-        counters.put(testName, new AtomicInteger());
+        COUNTERS.put(testName, new AtomicInteger());
 
         final RemoteTableDescriptor counterTableDesc =
             new RemoteTableDescriptor("counter-table-1")
@@ -395,19 +385,21 @@ public class TestRemoteTableEndToEnd extends 
IntegrationTestHarness {
       }
     };
 
-    final Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
-    executeRun(runner, config);
-    runner.waitForFinish();
+    int numPageViews = 40;
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+    InMemoryInputDescriptor<PageView> inputDescriptor = isd
+        .getInputDescriptor("PageView", new NoOpSerde<>());
+    TestRunner.of(app)
+        .addInputStream(inputDescriptor, 
TestTableData.generatePartitionedPageViews(numPageViews, 4))
+        .run(Duration.ofSeconds(10));
 
-    final int numExpected = count * partitionCount;
-    Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
-    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof 
EnrichedPageView);
+    Assert.assertEquals(numPageViews, WRITTEN_RECORDS.get(testName).size());
+    Assert.assertNotNull(WRITTEN_RECORDS.get(testName).get(0));
     if (!withArgs) {
-      writtenRecords.get(testName).forEach(epv -> 
Assert.assertFalse(epv.company.contains("-")));
+      WRITTEN_RECORDS.get(testName).forEach(epv -> 
Assert.assertFalse(epv.company.contains("-")));
     } else {
-      writtenRecords.get(testName).forEach(epv -> 
Assert.assertTrue(epv.company.endsWith("-r-w")));
-      Assert.assertEquals(numExpected, counters.get(testName).get());
+      WRITTEN_RECORDS.get(testName).forEach(epv -> 
Assert.assertTrue(epv.company.endsWith("-r-w")));
+      Assert.assertEquals(numPageViews, COUNTERS.get(testName).get());
     }
   }
 
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
index 3cad14e..e37bd86 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
@@ -33,10 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -48,7 +45,9 @@ import org.apache.samza.table.remote.BaseTableFunction;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
 import org.junit.Assert;
@@ -58,11 +57,10 @@ import static org.apache.samza.test.table.TestTableData.*;
 import static org.mockito.Mockito.*;
 
 
-public class TestRemoteTableWithBatchEndToEnd extends IntegrationTestHarness {
-
-  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
-  static Map<String, AtomicInteger> batchWrites = new HashMap<>();
-  static Map<String, AtomicInteger> batchReads = new HashMap<>();
+public class TestRemoteTableWithBatchEndToEnd {
+  private static final Map<String, List<EnrichedPageView>> WRITTEN_RECORDS = 
new HashMap<>();
+  private static final Map<String, AtomicInteger> BATCH_WRITES = new 
HashMap<>();
+  private static final Map<String, AtomicInteger> BATCH_READS = new 
HashMap<>();
 
   static class InMemoryReadFunction extends BaseTableFunction
       implements TableReadFunction<Integer, Profile> {
@@ -80,7 +78,7 @@ public class TestRemoteTableWithBatchEndToEnd extends 
IntegrationTestHarness {
       in.defaultReadObject();
       Profile[] profiles = 
Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
       this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> 
p.getMemberId(), Function.identity()));
-      batchReadCounter = batchReads.get(testName);
+      batchReadCounter = BATCH_READS.get(testName);
     }
 
     @Override
@@ -119,8 +117,8 @@ public class TestRemoteTableWithBatchEndToEnd extends 
IntegrationTestHarness {
       in.defaultReadObject();
 
       // Write to the global list for verification
-      records = writtenRecords.get(testName);
-      batchWritesCounter = batchWrites.get(testName);
+      records = WRITTEN_RECORDS.get(testName);
+      batchWritesCounter = BATCH_WRITES.get(testName);
     }
 
     @Override
@@ -147,7 +145,6 @@ public class TestRemoteTableWithBatchEndToEnd extends 
IntegrationTestHarness {
     }
   }
 
-
   static class MyReadFunction extends BaseTableFunction
       implements TableReadFunction {
     @Override
@@ -164,24 +161,14 @@ public class TestRemoteTableWithBatchEndToEnd extends 
IntegrationTestHarness {
   private void doTestStreamTableJoinRemoteTable(String testName, boolean 
batchRead, boolean batchWrite) throws Exception {
     final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
 
-    batchReads.put(testName, new AtomicInteger());
-    batchWrites.put(testName, new AtomicInteger());
-    writtenRecords.put(testName, new CopyOnWriteArrayList<>());
+    BATCH_READS.put(testName, new AtomicInteger());
+    BATCH_WRITES.put(testName, new AtomicInteger());
+    WRITTEN_RECORDS.put(testName, new CopyOnWriteArrayList<>());
 
-    final int count = 16;
-    final int batchSize = 4;
-    PageView[] pageViews = generatePageViewsWithDistinctKeys(count);
+    int count = 16;
+    int batchSize = 4;
     String profiles = Base64Serializer.serialize(generateProfiles(count));
 
-    int partitionCount = 1;
-    Map<String, String> configs = 
TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", 
Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", 
String.valueOf(partitionCount));
-    configs.put("task.max.concurrency", String.valueOf(count));
-    configs.put("task.async.commit", String.valueOf(true));
-
     final RateLimiter readRateLimiter = mock(RateLimiter.class, 
withSettings().serializable());
     final RateLimiter writeRateLimiter = mock(RateLimiter.class, 
withSettings().serializable());
     final TableRateLimiter.CreditFunction creditFunction = (k, v, args)->1;
@@ -219,20 +206,23 @@ public class TestRemoteTableWithBatchEndToEnd extends 
IntegrationTestHarness {
           .sendTo(outputTable);
     };
 
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
-    executeRun(runner, config);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+    InMemoryInputDescriptor<PageView> inputDescriptor = isd
+        .getInputDescriptor("PageView", new NoOpSerde<>());
+    TestRunner.of(app)
+        .addInputStream(inputDescriptor, 
Arrays.asList(generatePageViewsWithDistinctKeys(count)))
+        .addConfig("task.max.concurrency", String.valueOf(count))
+        .addConfig("task.async.commit", String.valueOf(true))
+        .run(Duration.ofSeconds(10));
 
-    runner.waitForFinish();
-    int numExpected = count * partitionCount;
-    Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
-    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof 
EnrichedPageView);
+    Assert.assertEquals(count, WRITTEN_RECORDS.get(testName).size());
+    Assert.assertNotNull(WRITTEN_RECORDS.get(testName).get(0));
 
     if (batchRead) {
-      Assert.assertEquals(numExpected / batchSize, 
batchReads.get(testName).get());
+      Assert.assertEquals(count / batchSize, BATCH_READS.get(testName).get());
     }
     if (batchWrite) {
-      Assert.assertEquals(numExpected / batchSize, 
batchWrites.get(testName).get());
+      Assert.assertEquals(count / batchSize, BATCH_WRITES.get(testName).get());
     }
   }
 
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index 39f9b02..f7d74a6 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -244,7 +244,7 @@ public class TestTableData {
     return pageViews;
   }
 
-  static public PageView[] generatePageViewsWithDistinctKeys(int count) {
+  public static PageView[] generatePageViewsWithDistinctKeys(int count) {
     Random random = new Random();
     PageView[] pageviews = new PageView[count];
     for (int i = 0; i < count; i++) {
@@ -256,7 +256,7 @@ public class TestTableData {
 
   private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB", 
"AMZN", "CSCO"};
 
-  static public Profile[] generateProfiles(int count) {
+  public static Profile[] generateProfiles(int count) {
     Random random = new Random();
     Profile[] profiles = new Profile[count];
     for (int i = 0; i < count; i++) {

Reply via email to