Repository: samza
Updated Branches:
  refs/heads/master 5587ebfec -> 13e26fb45


SAMZA-1759: Stream Assert utilities for low level and high level api for 
TestFramework

Adding utilities and corresponding test for low and high level api

Author: Sanil Jain <[email protected]>

Reviewers: Shanthoosh Venkataraman <[email protected]>

Closes #568 from Sanil15/SAMZA-1759 and squashes the following commits:

a4861089 [Sanil Jain] Reverting back travis increase for wait time
876a3a58 [Sanil Jain] Increase travis timeout
9e6482b1 [Sanil Jain] Fixing travis build, removing unused imports
526244e8 [Sanil Jain] Merge branch 'master' into SAMZA-1759
9f489acf [Sanil Jain] Moving tests that use MessageStreamAssert to same package 
name in test folder to use package private
a93e5a14 [Sanil Jain] Marking collection transient to ensure newer api changes 
work
5e6d3ed1 [Sanil Jain] Making MessageStreamAssert package private
a5a521cc [Sanil Jain] Splitting operator assertions outside StreamAssert to 
MessageStreamAssert, addressing review, renaming utils
d1e64180 [Sanil Jain] Cleaning unused imports
ff218ff7 [Sanil Jain] Removing contains method for operator level assertios for 
high level api
c5768772 [Sanil Jain] Merge branch 'SAMZA-1759' of 
https://github.com/Sanil15/samza into SAMZA-1759
c69d1bbb [Sanil Jain] StreamAssert Utilities for Low level and High Level Api, 
Adding More Test for Low Level api for testing multiple partitions and in 
mulithreaded mode
e3c8e2a5 [Sanil Jain] StreamAssert Utilities for Low level and High Level Api, 
Adding More Test for Low Level api for testing multiple partitions and in 
mulithreaded mode


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/13e26fb4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/13e26fb4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/13e26fb4

Branch: refs/heads/master
Commit: 13e26fb45b69c2f6f74a970475cb8239157d069e
Parents: 5587ebf
Author: Sanil Jain <[email protected]>
Authored: Wed Aug 1 17:18:21 2018 -0700
Committer: Boris S <[email protected]>
Committed: Wed Aug 1 17:18:21 2018 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   6 +-
 .../test/framework/MessageStreamAssert.java     | 192 ++++++++++++
 .../samza/test/framework/StreamAssert.java      | 220 +++++---------
 .../apache/samza/test/framework/TestRunner.java |   4 +-
 .../test/framework/stream/CollectionStream.java |   2 +-
 .../AsyncStreamTaskIntegrationTest.java         |  79 ++++-
 .../test/framework/BroadcastAssertApp.java      |  58 ++++
 .../samza/test/framework/MyAsyncStreamTask.java |   3 +-
 .../samza/test/framework/MyStreamTestTask.java  |   3 +-
 .../StreamApplicationIntegrationTest.java       |   9 +-
 ...StreamApplicationIntegrationTestHarness.java | 302 ++++++++++++++++++
 .../framework/StreamTaskIntegrationTest.java    |  76 ++++-
 .../samza/test/framework/TestTimerApp.java      |  86 ++++++
 .../apache/samza/test/framework/TimerTest.java  |  50 +++
 .../samza/test/operator/BroadcastAssertApp.java |  59 ----
 ...StreamApplicationIntegrationTestHarness.java | 303 -------------------
 .../operator/TestRepartitionJoinWindowApp.java  |   2 +
 .../test/operator/TestRepartitionWindowApp.java |   1 +
 .../apache/samza/test/timer/TestTimerApp.java   |  87 ------
 .../org/apache/samza/test/timer/TimerTest.java  |  51 ----
 20 files changed, 937 insertions(+), 656 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ef112f2..2a3ae0c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,15 +5,15 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 # http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-# 
+#
 
 language: java
 

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
 
b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
new file mode 100644
index 0000000..1a1c24c
--- /dev/null
+++ 
b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * An assertion on the content of a {@link MessageStream}.
+ *
+ * <pre>Example: {@code
+ * MessageStream<String> stream = streamGraph.getInputStream("input", 
serde).map(some_function)...;
+ * ...
+ * MessageStreamAssert.that(id, stream, 
stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c"));
+ * }</pre>
+ *
+ */
+@VisibleForTesting
+class MessageStreamAssert<M> {
+  private final static Map<String, CountDownLatch> LATCHES = new 
ConcurrentHashMap<>();
+  private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0);
+
+  private final String id;
+  private final MessageStream<M> messageStream;
+  private final Serde<M> serde;
+  private boolean checkEachTask = false;
+
+  /**
+   * Constructors a MessageStreamAssert with an id and serde
+   * @param id unique id
+   * @param messageStream represents messageStream that you want to assert on
+   * @param serde serde used to desialize messageStream
+   * @param <M> represents type of Message
+   * @return MessageStreamAssert that returns the the messages in the stream
+   */
+  public static <M> MessageStreamAssert<M> that(String id, MessageStream<M> 
messageStream, Serde<M> serde) {
+    return new MessageStreamAssert<>(id, messageStream, serde);
+  }
+
+  private MessageStreamAssert(String id, MessageStream<M> messageStream, 
Serde<M> serde) {
+    this.id = id;
+    this.messageStream = messageStream;
+    this.serde = serde;
+  }
+
+  public MessageStreamAssert forEachTask() {
+    checkEachTask = true;
+    return this;
+  }
+
+  public void containsInAnyOrder(final Collection<M> expected) {
+    LATCHES.putIfAbsent(id, PLACE_HOLDER);
+    final MessageStream<M> streamToCheck = checkEachTask
+        ? messageStream
+        : messageStream
+            .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), 
serde), null)
+            .map(kv -> kv.value);
+
+    streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, 
checkEachTask));
+  }
+
+  public static void waitForComplete() {
+    try {
+      while (!LATCHES.isEmpty()) {
+        final Set<String> ids  = new HashSet<>(LATCHES.keySet());
+        for (String id : ids) {
+          while (LATCHES.get(id) == PLACE_HOLDER) {
+            Thread.sleep(100);
+          }
+
+          final CountDownLatch latch = LATCHES.get(id);
+          if (latch != null) {
+            latch.await();
+            LATCHES.remove(id);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final class CheckAgainstExpected<M> implements 
SinkFunction<M> {
+    private static final long TIMEOUT = 5000L;
+
+    private final String id;
+    private final boolean checkEachTask;
+    private final transient Collection<M> expected;
+
+
+    private transient Timer timer = new Timer();
+    private transient List<M> actual = Collections.synchronizedList(new 
ArrayList<>());
+    private transient TimerTask timerTask = new TimerTask() {
+      @Override
+      public void run() {
+        check();
+      }
+    };
+
+    CheckAgainstExpected(String id, Collection<M> expected, boolean 
checkEachTask) {
+      this.id = id;
+      this.expected = expected;
+      this.checkEachTask = checkEachTask;
+    }
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      final SystemStreamPartition ssp = 
Iterables.getFirst(context.getSystemStreamPartitions(), null);
+      if (ssp != null || ssp.getPartition().getPartitionId() == 0) {
+        final int count = checkEachTask ? 
context.getSamzaContainerContext().taskNames.size() : 1;
+        LATCHES.put(id, new CountDownLatch(count));
+        timer.schedule(timerTask, TIMEOUT);
+      }
+    }
+
+    @Override
+    public void apply(M message, MessageCollector messageCollector, 
TaskCoordinator taskCoordinator) {
+      actual.add(message);
+
+      if (actual.size() >= expected.size()) {
+        timerTask.cancel();
+        check();
+      }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+      in.defaultReadObject();
+      timer = new Timer();
+      actual = Collections.synchronizedList(new ArrayList<>());
+      timerTask = new TimerTask() {
+        @Override
+        public void run() {
+          check();
+        }
+      };
+    }
+
+    private void check() {
+      final CountDownLatch latch = LATCHES.get(id);
+      try {
+        assertThat(actual, Matchers.containsInAnyOrder((M[]) 
expected.toArray()));
+        throw new IllegalArgumentException("asdas");
+      } finally {
+        latch.countDown();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index a1ac299..9972d7f 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -19,163 +19,95 @@
 
 package org.apache.samza.test.framework;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.hamcrest.Matchers;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.stream.Collectors;
+import org.apache.samza.test.framework.stream.CollectionStream;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.hamcrest.collection.IsIterableContainingInOrder;
 
 import static org.junit.Assert.assertThat;
 
+
 /**
- * An assertion on the content of a {@link MessageStream}.
- *
- * <pre>Example: {@code
- * MessageStream<String> stream = streamGraph.getInputStream("input", 
serde).map(some_function)...;
- * ...
- * StreamAssert.that(id, stream, 
stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c"));
- * }</pre>
- *
+ * Assertion utils non the content of a {@link CollectionStream}.
  */
-public class StreamAssert<M> {
-  private final static Map<String, CountDownLatch> LATCHES = new 
ConcurrentHashMap<>();
-  private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0);
-
-  private final String id;
-  private final MessageStream<M> messageStream;
-  private final Serde<M> serde;
-  private boolean checkEachTask = false;
-
-  public static <M> StreamAssert<M> that(String id, MessageStream<M> 
messageStream, Serde<M> serde) {
-    return new StreamAssert<>(id, messageStream, serde);
-  }
-
-  private StreamAssert(String id, MessageStream<M> messageStream, Serde<M> 
serde) {
-    this.id = id;
-    this.messageStream = messageStream;
-    this.serde = serde;
-  }
-
-  public StreamAssert forEachTask() {
-    checkEachTask = true;
-    return this;
+public class StreamAssert {
+  /**
+   * Util to assert  presence of messages in a stream with single partition in 
any order
+   *
+   * @param collectionStream represents the actual stream which will be 
consumed to compare against expected list
+   * @param expected represents the expected stream of messages
+   * @param timeout maximum time to wait for consuming the stream
+   * @param <M> represents the type of Message in the stream
+   * @throws InterruptedException when {@code consumeStream} is interrupted by 
another thread during polling messages
+   */
+  public static <M> void containsInAnyOrder(CollectionStream<M> 
collectionStream, final List<M> expected, Duration timeout)
+      throws InterruptedException {
+    Preconditions.checkNotNull(collectionStream, "This util is intended to use 
only on CollectionStream");
+    assertThat(TestRunner.consumeStream(collectionStream, timeout)
+        .entrySet()
+        .stream()
+        .flatMap(entry -> entry.getValue().stream())
+        .collect(Collectors.toList()), 
IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
   }
 
-  public void containsInAnyOrder(final Collection<M> expected) {
-    LATCHES.putIfAbsent(id, PLACE_HOLDER);
-    final MessageStream<M> streamToCheck = checkEachTask
-        ? messageStream
-        : messageStream
-          .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), 
serde), null)
-          .map(kv -> kv.value);
-
-    streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, 
checkEachTask));
-  }
-
-  public static void waitForComplete() {
-    try {
-      while (!LATCHES.isEmpty()) {
-        final Set<String> ids  = new HashSet<>(LATCHES.keySet());
-        for (String id : ids) {
-          while (LATCHES.get(id) == PLACE_HOLDER) {
-            Thread.sleep(100);
-          }
-
-          final CountDownLatch latch = LATCHES.get(id);
-          if (latch != null) {
-            latch.await();
-            LATCHES.remove(id);
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+  /**
+   * Util to assert presence of messages in a stream with multiple partition 
in any order
+   *
+   * @param collectionStream represents the actual stream which will be 
consumed to compare against expected partition map
+   * @param expected represents a map of partitionId as key and list of 
messages in stream as value
+   * @param timeout maximum time to wait for consuming the stream
+   * @param <M> represents the type of Message in the stream
+   * @throws InterruptedException when {@code consumeStream} is interrupted by 
another thread during polling messages
+   *
+   */
+  public static <M> void containsInAnyOrder(CollectionStream<M> 
collectionStream, final Map<Integer, List<M>> expected,
+      Duration timeout) throws InterruptedException {
+    Preconditions.checkNotNull(collectionStream, "This util is intended to use 
only on CollectionStream");
+    Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, 
timeout);
+    for (Integer paritionId : expected.keySet()) {
+      assertThat(actual.get(paritionId),
+          
IsIterableContainingInAnyOrder.containsInAnyOrder(expected.get(paritionId).toArray()));
     }
   }
 
-  private static final class CheckAgainstExpected<M> implements 
SinkFunction<M> {
-    private static final long TIMEOUT = 5000L;
-
-    private final String id;
-    private final boolean checkEachTask;
-    private final Collection<M> expected;
-
-
-    private transient Timer timer = new Timer();
-    private transient List<M> actual = Collections.synchronizedList(new 
ArrayList<>());
-    private transient TimerTask timerTask = new TimerTask() {
-      @Override
-      public void run() {
-        check();
-      }
-    };
-
-    CheckAgainstExpected(String id, Collection<M> expected, boolean 
checkEachTask) {
-      this.id = id;
-      this.expected = expected;
-      this.checkEachTask = checkEachTask;
-    }
-
-    @Override
-    public void init(Config config, TaskContext context) {
-      final SystemStreamPartition ssp = 
Iterables.getFirst(context.getSystemStreamPartitions(), null);
-      if (ssp == null ? false : ssp.getPartition().getPartitionId() == 0) {
-        final int count = checkEachTask ? 
context.getSamzaContainerContext().taskNames.size() : 1;
-        LATCHES.put(id, new CountDownLatch(count));
-        timer.schedule(timerTask, TIMEOUT);
-      }
-    }
-
-    @Override
-    public void apply(M message, MessageCollector messageCollector, 
TaskCoordinator taskCoordinator) {
-      actual.add(message);
-
-      if (actual.size() >= expected.size()) {
-        timerTask.cancel();
-        check();
-      }
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-      in.defaultReadObject();
-      timer = new Timer();
-      actual = Collections.synchronizedList(new ArrayList<>());
-      timerTask = new TimerTask() {
-        @Override
-        public void run() {
-          check();
-        }
-      };
-    }
+  /**
+   * Util to assert ordering of messages in a stream with single partition
+   *
+   * @param collectionStream represents the actual stream which will be 
consumed to compare against expected list
+   * @param expected represents the expected stream of messages
+   * @param timeout maximum time to wait for consuming the stream
+   * @param <M> represents the type of Message in the stream
+   * @throws InterruptedException when {@code consumeStream} is interrupted by 
another thread during polling messages
+   */
+  public static <M> void containsInOrder(CollectionStream<M> collectionStream, 
final List<M> expected, Duration timeout)
+      throws InterruptedException {
+    Preconditions.checkNotNull(collectionStream, "This util is intended to use 
only on CollectionStream");
+    assertThat(TestRunner.consumeStream(collectionStream, timeout)
+        .entrySet()
+        .stream()
+        .flatMap(entry -> entry.getValue().stream())
+        .collect(Collectors.toList()), 
IsIterableContainingInOrder.contains(expected.toArray()));
+  }
 
-    private void check() {
-      final CountDownLatch latch = LATCHES.get(id);
-      try {
-        assertThat(actual, Matchers.containsInAnyOrder((M[]) 
expected.toArray()));
-      } finally {
-        latch.countDown();
-      }
+  /**
+   * Util to assert ordering of messages in a multi-partitioned stream
+   *
+   * @param collectionStream represents the actual stream which will be 
consumed to compare against expected partition map
+   * @param expected represents a map of partitionId as key and list of 
messages as value
+   * @param timeout maximum time to wait for consuming the stream
+   * @param <M> represents the type of Message in the stream
+   * @throws InterruptedException when {@code consumeStream} is interrupted by 
another thread during polling messages
+   */
+  public static <M> void containsInOrder(CollectionStream<M> collectionStream, 
final Map<Integer, List<M>> expected,
+      Duration timeout) throws InterruptedException {
+    Preconditions.checkNotNull(collectionStream, "This util is intended to use 
only on CollectionStream");
+    Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, 
timeout);
+    for (Integer paritionId : expected.keySet()) {
+      assertThat(actual.get(paritionId), 
IsIterableContainingInOrder.contains(expected.get(paritionId).toArray()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 6e647d9..3c45967 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -311,7 +311,7 @@ public class TestRunner {
    *         i.e messages in the partition
    * @throws InterruptedException Thrown when a blocking poll has been 
interrupted by another thread.
    */
-  public static <T> Map<Integer, List<T>> consumeStream(CollectionStream 
stream, Integer timeout) throws InterruptedException {
+  public static <T> Map<Integer, List<T>> consumeStream(CollectionStream 
stream, Duration timeout) throws InterruptedException {
     Preconditions.checkNotNull(stream);
     Preconditions.checkNotNull(stream.getSystemName());
     String streamName = stream.getStreamName();
@@ -334,7 +334,7 @@ public class TestRunner {
     long t = System.currentTimeMillis();
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new 
HashMap<>();
     HashSet<SystemStreamPartition> didNotReachEndOfStream = new 
HashSet<>(ssps);
-    while (System.currentTimeMillis() < t + timeout) {
+    while (System.currentTimeMillis() < t + timeout.toMillis()) {
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = 
consumer.poll(ssps, 10);
       for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> 
entry : currentState.entrySet()) {
         SystemStreamPartition ssp = entry.getKey();

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
 
b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
index b3d9485..320a0ac 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
@@ -104,7 +104,7 @@ public class CollectionStream<T> {
   private CollectionStream(String systemName, String streamName, Map<Integer, 
? extends Iterable<T>> initPartitions) {
     this(systemName, streamName);
     Preconditions.checkNotNull(initPartitions);
-    initPartitions = new HashMap<>(initPartitions);
+    this.initPartitions = new HashMap<>(initPartitions);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index ad25cae..3a1eba0 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -20,8 +20,13 @@
 package org.apache.samza.test.framework;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.operators.KV;
 import org.apache.samza.test.framework.stream.CollectionStream;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
@@ -44,10 +49,82 @@ public class AsyncStreamTaskIntegrationTest {
         .addOutputStream(output)
         .run(Duration.ofSeconds(2));
 
-    Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0),
+    Assert.assertThat(TestRunner.consumeStream(output, 
Duration.ofMillis(1000)).get(0),
         IsIterableContainingInOrder.contains(outputList.toArray()));
   }
 
+  @Test
+  public void testAsyncTaskWithSinglePartitionUsingStreamAssert() throws 
Exception {
+    List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputList = Arrays.asList(50, 10, 20, 30, 40);
+
+    CollectionStream<Integer> input = CollectionStream.of("async-test", 
"ints", inputList);
+    CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+
+    TestRunner
+        .of(MyAsyncStreamTask.class)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInAnyOrder(output, outputList, 
Duration.ofMillis(1000));
+  }
+
+  @Test
+  public void testAsyncTaskWithMultiplePartition() throws Exception {
+    Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
+    Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputPartition = partition.stream().map(x -> x * 
10).collect(Collectors.toList());
+    for (int i = 0; i < 5; i++) {
+      List<KV> keyedPartition = new ArrayList<>();
+      for (Integer val : partition) {
+        keyedPartition.add(KV.of(i, val));
+      }
+      inputPartitionData.put(i, keyedPartition);
+      expectedOutputPartitionData.put(i, new 
ArrayList<Integer>(outputPartition));
+    }
+
+    CollectionStream<KV> inputStream = CollectionStream.of("async-test", 
"ints", inputPartitionData);
+    CollectionStream outputStream = CollectionStream.empty("async-test", 
"ints-out", 5);
+
+    TestRunner
+        .of(MyAsyncStreamTask.class)
+        .addInputStream(inputStream)
+        .addOutputStream(outputStream)
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, 
Duration.ofMillis(1000));
+  }
+
+  @Test
+  public void testAsyncTaskWithMultiplePartitionMultithreaded() throws 
Exception {
+    Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
+    Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputPartition = partition.stream().map(x -> x * 
10).collect(Collectors.toList());
+    for (int i = 0; i < 5; i++) {
+      List<KV> keyedPartition = new ArrayList<>();
+      for (Integer val : partition) {
+        keyedPartition.add(KV.of(i, val));
+      }
+      inputPartitionData.put(i, keyedPartition);
+      expectedOutputPartitionData.put(i, new 
ArrayList<Integer>(outputPartition));
+    }
+
+    CollectionStream<KV> inputStream = CollectionStream.of("async-test", 
"ints", inputPartitionData);
+    CollectionStream outputStream = CollectionStream.empty("async-test", 
"ints-out", 5);
+
+    TestRunner
+        .of(MyAsyncStreamTask.class)
+        .addInputStream(inputStream)
+        .addOutputStream(outputStream)
+        .addOverrideConfig("task.max.concurrency", "4")
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInAnyOrder(outputStream, expectedOutputPartitionData, 
Duration.ofMillis(1000));
+  }
+
   /**
    * Job should fail because it times out too soon
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
new file mode 100644
index 0000000..ec52aa4
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.test.operator.data.PageView;
+
+import java.util.Arrays;
+
+public class BroadcastAssertApp implements StreamApplication {
+
+  public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
+
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
+
+    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
+    final MessageStream<PageView> broadcastPageViews = graph
+        .getInputStream(inputTopic, serde)
+        .broadcast(serde, "pv");
+
+    /**
+     * Each task will see all the pageview events
+     */
+    MessageStreamAssert.that("Each task contains all broadcast PageView 
events", broadcastPageViews, serde)
+        .forEachTask()
+        .containsInAnyOrder(
+            Arrays.asList(
+                new PageView("v1", "p1", "u1"),
+                new PageView("v2", "p2", "u1"),
+                new PageView("v3", "p1", "u2"),
+                new PageView("v4", "p3", "u2")
+            ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java
index 347e766..4ecb4b6 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java
@@ -60,7 +60,8 @@ class RestCall extends Thread {
       System.out.println("Thread " + this.getName() + " interrupted.");
     }
     Integer obj = (Integer) envelope.getMessage();
-    messageCollector.send(new OutgoingMessageEnvelope(new 
SystemStream("async-test", "ints-out"), obj * 10));
+    messageCollector.send(new OutgoingMessageEnvelope(new 
SystemStream("async-test", "ints-out"),
+        envelope.getKey(), envelope.getKey(), obj * 10));
     callback.complete();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
index c83e461..a07fe74 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
@@ -32,6 +32,7 @@ public class MyStreamTestTask implements StreamTask {
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
       throws Exception {
     Integer obj = (Integer) envelope.getMessage();
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("test", 
"output"), obj * 10));
+    collector.send(new OutgoingMessageEnvelope(new SystemStream("test", 
"output"),
+        envelope.getKey(), envelope.getKey(), obj * 10));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 8ac40e1..ba4c985 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -20,7 +20,9 @@ package org.apache.samza.test.framework;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
@@ -60,10 +62,12 @@ public class StreamApplicationIntegrationTest {
     Random random = new Random();
     int count = 10;
     List<PageView> pageviews = new ArrayList<>(count);
+    Map<Integer, List<PageView>> expectedOutput = new HashMap<>();
     for (int i = 0; i < count; i++) {
       String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
       int memberId = i;
-      pageviews.add(new PageView(pagekey, memberId));
+      PageView pv = new PageView(pagekey, memberId);
+      pageviews.add(pv);
     }
 
     CollectionStream<PageView> input = CollectionStream.of("test", "PageView", 
pageviews);
@@ -76,7 +80,7 @@ public class StreamApplicationIntegrationTest {
         .addOverrideConfig("job.default.system", "test")
         .run(Duration.ofMillis(1500));
 
-    Assert.assertEquals(TestRunner.consumeStream(output, 
10000).get(random.nextInt(count)).size(), 1);
+    Assert.assertEquals(TestRunner.consumeStream(output, 
Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
   }
 
   public static final class Values {
@@ -124,4 +128,5 @@ public class StreamApplicationIntegrationTest {
         .addOverrideConfig("job.default.system", "test")
         .run(Duration.ofMillis(1000));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
new file mode 100644
index 0000000..810d2c2
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.framework;
+
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.execution.TestStreamManager;
+import org.apache.samza.runtime.AbstractApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.kafka.KafkaSystemAdmin;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import scala.Option;
+import scala.Option$;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Harness for writing integration tests for {@link StreamApplication}s.
+ *
+ * <p> This provides the following features for its sub-classes:
+ * <ul>
+ *   <li>
+ *     Automatic Setup and teardown: Any non-trivial integration test brings 
up components like Zookeeper
+ *     servers, Kafka brokers, Kafka producers and Kafka consumers. This 
harness initializes each
+ *     of these components in {@link #setUp()} and shuts down each of them 
cleanly in {@link #tearDown()}.
+ *     {@link #setUp()} and {@link #tearDown()} are automatically invoked from 
the Junit runner.
+ *   <li>
+ *     Interaction with Kafka: The harness provides convenience methods to 
interact with Kafka brokers, consumers
+ *     and producers - for instance, methods to create topics, produce and 
consume messages.
+ *   <li>
+ *     Config defaults: Often Samza integration tests have to setup config 
boiler plate
+ *     to perform even simple tasks like producing and consuming messages. 
This harness provides default string
+ *     serdes for producing / consuming messages and a default system-alias 
named "kafka" that uses the
+ *     {@link org.apache.samza.system.kafka.KafkaSystemFactory}.
+ *   <li>
+ *     Debugging: At times, it is convenient to debug integration tests 
locally from an IDE. This harness
+ *     runs all its components (including Kafka brokers, Zookeeper servers and 
Samza) locally.
+ * </ul>
+ *
+ * <p> <i> Implementation Notes: </i> <br/>
+ * State persistence: {@link #tearDown()} clears all associated state 
(including topics and metadata) in Kafka and
+ * Zookeeper. Hence, the state is not durable across invocations of {@link 
#tearDown()} <br/>
+ *
+ * Execution model: {@link StreamApplication}s are run as their own {@link 
org.apache.samza.job.local.ThreadJob}s.
+ * Similarly, embedded Kafka servers and Zookeeper servers are run as their 
own threads.
+ * {@link #produceMessage(String, int, String, String)} and {@link 
#consumeMessages(Collection, int)} are blocking calls.
+ *
+ * <h3>Usage Example</h3>
+ * Here is an actual test that publishes a message into Kafka, runs an 
application, and verifies consumption
+ * from the output topic.
+ *
+ *  <pre> {@code
+ * class MyTest extends StreamApplicationIntegrationTestHarness {
+ *   private final StreamApplication myApp = new MyStreamApplication();
+ *   private final Collection<String> outputTopics = 
Collections.singletonList("output-topic");
+ *   @Test
+ *   public void test() {
+ *     createTopic("mytopic", 1);
+ *     produceMessage("mytopic", 0, "key1", "val1");
+ *     runApplication(myApp, "myApp", null);
+ *     List<ConsumerRecord<String, String>> messages = 
consumeMessages(outputTopics)
+ *     Assert.assertEquals(messages.size(), 1);
+ *   }
+ * }}</pre>
+ */
+public class StreamApplicationIntegrationTestHarness extends 
AbstractIntegrationTestHarness {
+  private KafkaProducer producer;
+  private KafkaConsumer consumer;
+  protected KafkaSystemAdmin systemAdmin;
+
+  private int numEmptyPolls = 3;
+  private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
+  private static final String DEFAULT_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  /**
+   * Starts a single kafka broker, and a single embedded zookeeper server in 
their own threads.
+   * Sub-classes should invoke {@link #zkConnect()} and {@link 
#bootstrapUrl()}s to
+   * obtain the urls (and ports) of the started zookeeper and kafka broker.
+   */
+  @Override
+  public void setUp() {
+    super.setUp();
+
+    Properties consumerDeserializerProperties = new Properties();
+    consumerDeserializerProperties.setProperty("key.deserializer", 
DEFAULT_DESERIALIZER);
+    consumerDeserializerProperties.setProperty("value.deserializer", 
DEFAULT_DESERIALIZER);
+
+    producer = TestUtils.createNewProducer(
+        bootstrapServers(), // bootstrap-server url
+        1, // acks
+        60 * 1000L, // maxBlockMs
+        1024L * 1024L, // buffer size
+        0, // numRetries
+        0L, // lingerMs
+        5 * 1000L, // requestTimeout
+        SecurityProtocol.PLAINTEXT,
+        null,
+        Option.apply(new Properties()),
+        new StringSerializer(),
+        new StringSerializer(),
+        Option.apply(new Properties()));
+
+    consumer = TestUtils.createNewConsumer(
+        bootstrapServers(),
+        "group", // groupId
+        "earliest", // auto-offset-reset
+        4096L, // per-partition fetch size
+        "org.apache.kafka.clients.consumer.RangeAssignor", // partition 
Assigner
+        30000,
+        SecurityProtocol.PLAINTEXT,
+        Option$.MODULE$.<File>empty(),
+        Option$.MODULE$.<Properties>empty(),
+        Option$.MODULE$.<Properties>apply(consumerDeserializerProperties));
+
+    systemAdmin = createSystemAdmin("kafka");
+    systemAdmin.start();
+  }
+
+  /**
+   * Creates a kafka topic with the provided name and the number of partitions
+   * @param topicName the name of the topic
+   * @param numPartitions the number of partitions in the topic
+   */
+  public void createTopic(String topicName, int numPartitions) {
+    TestUtils.createTopic(zkUtils(), topicName, numPartitions, 1, servers(), 
new Properties());
+  }
+
+  /**
+   * Produces a message to the provided topic partition.
+   * @param topicName the topic to produce messages to
+   * @param partitionId the topic partition to produce messages to
+   * @param key the key in the message
+   * @param val the value in the message
+   */
+  public void produceMessage(String topicName, int partitionId, String key, 
String val) {
+    producer.send(new ProducerRecord(topicName, partitionId, key, val));
+    producer.flush();
+  }
+
+  @Override
+  public int clusterSize() {
+    return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
+  }
+
+
+  /**
+   * Read messages from the provided list of topics until {@param threshold} 
messages have been read or until
+   * {@link #numEmptyPolls} polls return no messages.
+   *
+   * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and 
the number of empty polls are
+   * determined by {@link #numEmptyPolls}
+   *
+   * @param topics the list of topics to consume from
+   * @param threshold the number of messages to consume
+   * @return the list of {@link ConsumerRecord}s whose size can be atmost 
{@param threshold}
+   */
+  public List<ConsumerRecord<String, String>> 
consumeMessages(Collection<String> topics, int threshold) {
+    int emptyPollCount = 0;
+    List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
+    consumer.subscribe(topics);
+
+    while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) {
+      ConsumerRecords<String, String> records = 
consumer.poll(POLL_TIMEOUT_MS.toMillis());
+      if (!records.isEmpty()) {
+        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
+        while (iterator.hasNext() && recordList.size() < threshold) {
+          ConsumerRecord record = iterator.next();
+          recordList.add(record);
+          emptyPollCount = 0;
+        }
+      } else {
+        emptyPollCount++;
+      }
+    }
+    return recordList;
+  }
+
+  /**
+   * Executes the provided {@link StreamApplication} as a {@link 
org.apache.samza.job.local.ThreadJob}. The
+   * {@link StreamApplication} runs in its own separate thread.
+   *
+   * @param streamApplication the application to run
+   * @param appName the name of the application
+   * @param overriddenConfigs configs to override
+   * @return RunApplicationContext which contains objects created within 
runApplication, to be used for verification
+   * if necessary
+   */
+  protected RunApplicationContext runApplication(StreamApplication 
streamApplication,
+      String appName,
+      Map<String, String> overriddenConfigs) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("job.factory.class", 
"org.apache.samza.job.local.ThreadJobFactory");
+    configMap.put("job.name", appName);
+    configMap.put("app.class", 
streamApplication.getClass().getCanonicalName());
+    configMap.put("serializers.registry.json.class", 
"org.apache.samza.serializers.JsonSerdeFactory");
+    configMap.put("serializers.registry.string.class", 
"org.apache.samza.serializers.StringSerdeFactory");
+    configMap.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
+    configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configMap.put("systems.kafka.samza.key.serde", "string");
+    configMap.put("systems.kafka.samza.msg.serde", "string");
+    configMap.put("systems.kafka.samza.offset.default", "oldest");
+    configMap.put("job.coordinator.system", "kafka");
+    configMap.put("job.default.system", "kafka");
+    configMap.put("job.coordinator.replication.factor", "1");
+    configMap.put("task.window.ms", "1000");
+    configMap.put("task.checkpoint.factory", 
TestStreamManager.MockCheckpointManagerFactory.class.getName());
+
+    // This is to prevent tests from taking a long time to stop after they're 
done. The issue is that
+    // tearDown currently doesn't call runner.kill(app), and shuts down the 
Kafka and ZK servers immediately.
+    // The test process then exits, triggering the SamzaContainer shutdown 
hook, which in turn tries to flush any
+    // store changelogs, which then get stuck trying to produce to the stopped 
Kafka server.
+    // Calling runner.kill doesn't work since RemoteApplicationRunner creates 
a new ThreadJob instance when
+    // kill is called. We can't use LocalApplicationRunner since 
ZkJobCoordinator doesn't currently create
+    // changelog streams. Hence we just force an unclean shutdown here to. 
This _should be_ OK
+    // since the test method has already executed by the time the shutdown 
hook is called. The side effect is
+    // that buffered state (e.g. changelog contents) might not be flushed 
correctly after the test run.
+    configMap.put("task.shutdown.ms", "1");
+
+    if (overriddenConfigs != null) {
+      configMap.putAll(overriddenConfigs);
+    }
+
+    Config config = new MapConfig(configMap);
+    AbstractApplicationRunner runner = (AbstractApplicationRunner) 
ApplicationRunner.fromConfig(config);
+    runner.run(streamApplication);
+
+    MessageStreamAssert.waitForComplete();
+    return new RunApplicationContext(runner, config);
+  }
+
+  public void setNumEmptyPolls(int numEmptyPolls) {
+    this.numEmptyPolls = numEmptyPolls;
+  }
+
+  /**
+   * Shutdown and clear Zookeeper and Kafka broker state.
+   */
+  @Override
+  public void tearDown() {
+    systemAdmin.stop();
+    producer.close();
+    consumer.close();
+    super.tearDown();
+  }
+
+  /**
+   * Container for any necessary context created during runApplication. Allows 
tests to access objects created within
+   * runApplication in order to do verification.
+   */
+  protected static class RunApplicationContext {
+    private final AbstractApplicationRunner runner;
+    private final Config config;
+
+    private RunApplicationContext(AbstractApplicationRunner runner, Config 
config) {
+      this.runner = runner;
+      this.config = config;
+    }
+
+    public AbstractApplicationRunner getRunner() {
+      return this.runner;
+    }
+
+    public Config getConfig() {
+      return this.config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2cc5977..f888b4a 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -20,14 +20,20 @@
 package org.apache.samza.test.framework;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
+import org.apache.samza.operators.KV;
 import org.apache.samza.test.framework.stream.CollectionStream;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
 import org.junit.Test;
 
+
 public class StreamTaskIntegrationTest {
 
   @Test
@@ -40,7 +46,7 @@ public class StreamTaskIntegrationTest {
 
     
TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
 
-    Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0),
+    Assert.assertThat(TestRunner.consumeStream(output, 
Duration.ofMillis(1000)).get(0),
         IsIterableContainingInOrder.contains(outputList.toArray()));
   }
 
@@ -54,11 +60,79 @@ public class StreamTaskIntegrationTest {
     CollectionStream<Double> input = CollectionStream.of("test", "doubles", 
inputList);
     CollectionStream output = CollectionStream.empty("test", "output");
 
+    
TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
+  }
+
+  @Test
+  public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception {
+    List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
+
+    CollectionStream<Integer> input = CollectionStream.of("test", "input", 
inputList);
+    CollectionStream output = CollectionStream.empty("test", "output");
+
     TestRunner
         .of(MyStreamTestTask.class)
         .addInputStream(input)
         .addOutputStream(output)
+        .addOverrideConfig("job.container.thread.pool.size", "4")
         .run(Duration.ofSeconds(1));
+
+    StreamAssert.containsInOrder(output, outputList, Duration.ofMillis(1000));
   }
 
+  @Test
+  public void testSyncTaskWithMultiplePartition() throws Exception {
+    Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
+    Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputPartition = partition.stream().map(x -> x * 
10).collect(Collectors.toList());
+    for (int i = 0; i < 5; i++) {
+      List<KV> keyedPartition = new ArrayList<>();
+      for (Integer val : partition) {
+        keyedPartition.add(KV.of(i, val));
+      }
+      inputPartitionData.put(i, keyedPartition);
+      expectedOutputPartitionData.put(i, new 
ArrayList<Integer>(outputPartition));
+    }
+
+    CollectionStream<KV> inputStream = CollectionStream.of("test", "input", 
inputPartitionData);
+    CollectionStream outputStream = CollectionStream.empty("test", "output", 
5);
+
+    TestRunner
+        .of(MyStreamTestTask.class)
+        .addInputStream(inputStream)
+        .addOutputStream(outputStream)
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, 
Duration.ofMillis(1000));
+  }
+
+  @Test
+  public void testSyncTaskWithMultiplePartitionMultithreaded() throws 
Exception {
+    Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
+    Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputPartition = partition.stream().map(x -> x * 
10).collect(Collectors.toList());
+    for (int i = 0; i < 5; i++) {
+      List<KV> keyedPartition = new ArrayList<>();
+      for (Integer val : partition) {
+        keyedPartition.add(KV.of(i, val));
+      }
+      inputPartitionData.put(i, keyedPartition);
+      expectedOutputPartitionData.put(i, new 
ArrayList<Integer>(outputPartition));
+    }
+
+    CollectionStream<KV> inputStream = CollectionStream.of("test", "input", 
inputPartitionData);
+    CollectionStream outputStream = CollectionStream.empty("test", "output", 
5);
+
+    TestRunner
+        .of(MyStreamTestTask.class)
+        .addInputStream(inputStream)
+        .addOutputStream(outputStream)
+        .addOverrideConfig("job.container.thread.pool.size", "4")
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, 
Duration.ofMillis(1000));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
new file mode 100644
index 0000000..09da838
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.test.operator.data.PageView;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TestTimerApp implements StreamApplication {
+  public static final String PAGE_VIEWS = "page-views";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
+    final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, 
serde);
+    final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapTimerFn());
+
+    MessageStreamAssert.that("Output from timer function should container all 
complete messages", output, serde)
+        .containsInAnyOrder(
+            Arrays.asList(
+                new PageView("v1-complete", "p1", "u1"),
+                new PageView("v2-complete", "p2", "u1"),
+                new PageView("v3-complete", "p1", "u2"),
+                new PageView("v4-complete", "p3", "u2")
+            ));
+  }
+
+  private static class FlatmapTimerFn implements FlatMapFunction<PageView, 
PageView>, TimerFunction<String, PageView> {
+
+    private transient List<PageView> pageViews;
+    private transient TimerRegistry<String> timerRegistry;
+
+    @Override
+    public void registerTimer(TimerRegistry<String> timerRegistry) {
+      this.timerRegistry = timerRegistry;
+      this.pageViews = new ArrayList<>();
+    }
+
+    @Override
+    public Collection<PageView> apply(PageView message) {
+      final PageView pv = new PageView(message.getViewId() + "-complete", 
message.getPageId(), message.getUserId());
+      pageViews.add(pv);
+
+      if (pageViews.size() == 2) {
+        //got all messages for this task
+        final long time = System.currentTimeMillis() + 100;
+        timerRegistry.register("CompleteTimer", time);
+      }
+      return Collections.emptyList();
+    }
+
+    @Override
+    public Collection<PageView> onTimer(String key, long time) {
+      return pageViews;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
new file mode 100644
index 0000000..a48409c
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.apache.samza.test.framework.TestTimerApp.PAGE_VIEWS;
+
+public class TimerTest extends StreamApplicationIntegrationTestHarness {
+
+  @Before
+  public void setup() {
+    // create topics
+    createTopic(PAGE_VIEWS, 2);
+
+    // create events for the following user activity.
+    // userId: (viewId, pageId, (adIds))
+    // u1: (v1, p1, (a1)), (v2, p2, (a3))
+    // u2: (v3, p1, (a1)), (v4, p3, (a5))
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+    produceMessage(PAGE_VIEWS, 1, "p3", 
"{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+  }
+
+  @Test
+  public void testJob() {
+    runApplication(new TestTimerApp(), "TimerTest", null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
deleted file mode 100644
index 88e2765..0000000
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.operator;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.test.operator.data.PageView;
-import org.apache.samza.test.framework.StreamAssert;
-
-import java.util.Arrays;
-
-public class BroadcastAssertApp implements StreamApplication {
-
-  public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
-
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
-
-    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
-    final MessageStream<PageView> broadcastPageViews = graph
-        .getInputStream(inputTopic, serde)
-        .broadcast(serde, "pv");
-
-    /**
-     * Each task will see all the pageview events
-     */
-    StreamAssert.that("Each task contains all broadcast PageView events", 
broadcastPageViews, serde)
-        .forEachTask()
-        .containsInAnyOrder(
-            Arrays.asList(
-                new PageView("v1", "p1", "u1"),
-                new PageView("v2", "p2", "u1"),
-                new PageView("v3", "p1", "u2"),
-                new PageView("v4", "p3", "u2")
-            ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
deleted file mode 100644
index 3c11533..0000000
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-import kafka.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.execution.TestStreamManager;
-import org.apache.samza.runtime.AbstractApplicationRunner;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.kafka.KafkaSystemAdmin;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.framework.StreamAssert;
-import scala.Option;
-import scala.Option$;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Harness for writing integration tests for {@link StreamApplication}s.
- *
- * <p> This provides the following features for its sub-classes:
- * <ul>
- *   <li>
- *     Automatic Setup and teardown: Any non-trivial integration test brings 
up components like Zookeeper
- *     servers, Kafka brokers, Kafka producers and Kafka consumers. This 
harness initializes each
- *     of these components in {@link #setUp()} and shuts down each of them 
cleanly in {@link #tearDown()}.
- *     {@link #setUp()} and {@link #tearDown()} are automatically invoked from 
the Junit runner.
- *   <li>
- *     Interaction with Kafka: The harness provides convenience methods to 
interact with Kafka brokers, consumers
- *     and producers - for instance, methods to create topics, produce and 
consume messages.
- *   <li>
- *     Config defaults: Often Samza integration tests have to setup config 
boiler plate
- *     to perform even simple tasks like producing and consuming messages. 
This harness provides default string
- *     serdes for producing / consuming messages and a default system-alias 
named "kafka" that uses the
- *     {@link org.apache.samza.system.kafka.KafkaSystemFactory}.
- *   <li>
- *     Debugging: At times, it is convenient to debug integration tests 
locally from an IDE. This harness
- *     runs all its components (including Kafka brokers, Zookeeper servers and 
Samza) locally.
- * </ul>
- *
- * <p> <i> Implementation Notes: </i> <br/>
- * State persistence: {@link #tearDown()} clears all associated state 
(including topics and metadata) in Kafka and
- * Zookeeper. Hence, the state is not durable across invocations of {@link 
#tearDown()} <br/>
- *
- * Execution model: {@link StreamApplication}s are run as their own {@link 
org.apache.samza.job.local.ThreadJob}s.
- * Similarly, embedded Kafka servers and Zookeeper servers are run as their 
own threads.
- * {@link #produceMessage(String, int, String, String)} and {@link 
#consumeMessages(Collection, int)} are blocking calls.
- *
- * <h3>Usage Example</h3>
- * Here is an actual test that publishes a message into Kafka, runs an 
application, and verifies consumption
- * from the output topic.
- *
- *  <pre> {@code
- * class MyTest extends StreamApplicationIntegrationTestHarness {
- *   private final StreamApplication myApp = new MyStreamApplication();
- *   private final Collection<String> outputTopics = 
Collections.singletonList("output-topic");
- *   @Test
- *   public void test() {
- *     createTopic("mytopic", 1);
- *     produceMessage("mytopic", 0, "key1", "val1");
- *     runApplication(myApp, "myApp", null);
- *     List<ConsumerRecord<String, String>> messages = 
consumeMessages(outputTopics)
- *     Assert.assertEquals(messages.size(), 1);
- *   }
- * }}</pre>
- */
-public class StreamApplicationIntegrationTestHarness extends 
AbstractIntegrationTestHarness {
-  private KafkaProducer producer;
-  private KafkaConsumer consumer;
-  protected KafkaSystemAdmin systemAdmin;
-
-  private int numEmptyPolls = 3;
-  private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
-  private static final String DEFAULT_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
-
-  /**
-   * Starts a single kafka broker, and a single embedded zookeeper server in 
their own threads.
-   * Sub-classes should invoke {@link #zkConnect()} and {@link 
#bootstrapUrl()}s to
-   * obtain the urls (and ports) of the started zookeeper and kafka broker.
-   */
-  @Override
-  public void setUp() {
-    super.setUp();
-
-    Properties consumerDeserializerProperties = new Properties();
-    consumerDeserializerProperties.setProperty("key.deserializer", 
DEFAULT_DESERIALIZER);
-    consumerDeserializerProperties.setProperty("value.deserializer", 
DEFAULT_DESERIALIZER);
-
-    producer = TestUtils.createNewProducer(
-        bootstrapServers(), // bootstrap-server url
-        1, // acks
-        60 * 1000L, // maxBlockMs
-        1024L * 1024L, // buffer size
-        0, // numRetries
-        0L, // lingerMs
-        5 * 1000L, // requestTimeout
-        SecurityProtocol.PLAINTEXT,
-        null,
-        Option.apply(new Properties()),
-        new StringSerializer(),
-        new StringSerializer(),
-        Option.apply(new Properties()));
-
-    consumer = TestUtils.createNewConsumer(
-        bootstrapServers(),
-        "group", // groupId
-        "earliest", // auto-offset-reset
-        4096L, // per-partition fetch size
-        "org.apache.kafka.clients.consumer.RangeAssignor", // partition 
Assigner
-        30000,
-        SecurityProtocol.PLAINTEXT,
-        Option$.MODULE$.<File>empty(),
-        Option$.MODULE$.<Properties>empty(),
-        Option$.MODULE$.<Properties>apply(consumerDeserializerProperties));
-
-    systemAdmin = createSystemAdmin("kafka");
-    systemAdmin.start();
-  }
-
-  /**
-   * Creates a kafka topic with the provided name and the number of partitions
-   * @param topicName the name of the topic
-   * @param numPartitions the number of partitions in the topic
-   */
-  public void createTopic(String topicName, int numPartitions) {
-    TestUtils.createTopic(zkUtils(), topicName, numPartitions, 1, servers(), 
new Properties());
-  }
-
-  /**
-   * Produces a message to the provided topic partition.
-   * @param topicName the topic to produce messages to
-   * @param partitionId the topic partition to produce messages to
-   * @param key the key in the message
-   * @param val the value in the message
-   */
-  public void produceMessage(String topicName, int partitionId, String key, 
String val) {
-    producer.send(new ProducerRecord(topicName, partitionId, key, val));
-    producer.flush();
-  }
-
-  @Override
-  public int clusterSize() {
-    return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
-  }
-
-
-  /**
-   * Read messages from the provided list of topics until {@param threshold} 
messages have been read or until
-   * {@link #numEmptyPolls} polls return no messages.
-   *
-   * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and 
the number of empty polls are
-   * determined by {@link #numEmptyPolls}
-   *
-   * @param topics the list of topics to consume from
-   * @param threshold the number of messages to consume
-   * @return the list of {@link ConsumerRecord}s whose size can be atmost 
{@param threshold}
-   */
-  public List<ConsumerRecord<String, String>> 
consumeMessages(Collection<String> topics, int threshold) {
-    int emptyPollCount = 0;
-    List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);
-
-    while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) {
-      ConsumerRecords<String, String> records = 
consumer.poll(POLL_TIMEOUT_MS.toMillis());
-      if (!records.isEmpty()) {
-        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
-        while (iterator.hasNext() && recordList.size() < threshold) {
-          ConsumerRecord record = iterator.next();
-          recordList.add(record);
-          emptyPollCount = 0;
-        }
-      } else {
-        emptyPollCount++;
-      }
-    }
-    return recordList;
-  }
-
-  /**
-   * Executes the provided {@link StreamApplication} as a {@link 
org.apache.samza.job.local.ThreadJob}. The
-   * {@link StreamApplication} runs in its own separate thread.
-   *
-   * @param streamApplication the application to run
-   * @param appName the name of the application
-   * @param overriddenConfigs configs to override
-   * @return RunApplicationContext which contains objects created within 
runApplication, to be used for verification
-   * if necessary
-   */
-  protected RunApplicationContext runApplication(StreamApplication 
streamApplication,
-      String appName,
-      Map<String, String> overriddenConfigs) {
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put("job.factory.class", 
"org.apache.samza.job.local.ThreadJobFactory");
-    configMap.put("job.name", appName);
-    configMap.put("app.class", 
streamApplication.getClass().getCanonicalName());
-    configMap.put("serializers.registry.json.class", 
"org.apache.samza.serializers.JsonSerdeFactory");
-    configMap.put("serializers.registry.string.class", 
"org.apache.samza.serializers.StringSerdeFactory");
-    configMap.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
-    configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
-    configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
-    configMap.put("systems.kafka.samza.key.serde", "string");
-    configMap.put("systems.kafka.samza.msg.serde", "string");
-    configMap.put("systems.kafka.samza.offset.default", "oldest");
-    configMap.put("job.coordinator.system", "kafka");
-    configMap.put("job.default.system", "kafka");
-    configMap.put("job.coordinator.replication.factor", "1");
-    configMap.put("task.window.ms", "1000");
-    configMap.put("task.checkpoint.factory", 
TestStreamManager.MockCheckpointManagerFactory.class.getName());
-
-    // This is to prevent tests from taking a long time to stop after they're 
done. The issue is that
-    // tearDown currently doesn't call runner.kill(app), and shuts down the 
Kafka and ZK servers immediately.
-    // The test process then exits, triggering the SamzaContainer shutdown 
hook, which in turn tries to flush any
-    // store changelogs, which then get stuck trying to produce to the stopped 
Kafka server.
-    // Calling runner.kill doesn't work since RemoteApplicationRunner creates 
a new ThreadJob instance when
-    // kill is called. We can't use LocalApplicationRunner since 
ZkJobCoordinator doesn't currently create
-    // changelog streams. Hence we just force an unclean shutdown here to. 
This _should be_ OK
-    // since the test method has already executed by the time the shutdown 
hook is called. The side effect is
-    // that buffered state (e.g. changelog contents) might not be flushed 
correctly after the test run.
-    configMap.put("task.shutdown.ms", "1");
-
-    if (overriddenConfigs != null) {
-      configMap.putAll(overriddenConfigs);
-    }
-
-    Config config = new MapConfig(configMap);
-    AbstractApplicationRunner runner = (AbstractApplicationRunner) 
ApplicationRunner.fromConfig(config);
-    runner.run(streamApplication);
-
-    StreamAssert.waitForComplete();
-    return new RunApplicationContext(runner, config);
-  }
-
-  public void setNumEmptyPolls(int numEmptyPolls) {
-    this.numEmptyPolls = numEmptyPolls;
-  }
-
-  /**
-   * Shutdown and clear Zookeeper and Kafka broker state.
-   */
-  @Override
-  public void tearDown() {
-    systemAdmin.stop();
-    producer.close();
-    consumer.close();
-    super.tearDown();
-  }
-
-  /**
-   * Container for any necessary context created during runApplication. Allows 
tests to access objects created within
-   * runApplication in order to do verification.
-   */
-  protected static class RunApplicationContext {
-    private final AbstractApplicationRunner runner;
-    private final Config config;
-
-    private RunApplicationContext(AbstractApplicationRunner runner, Config 
config) {
-      this.runner = runner;
-      this.config = config;
-    }
-
-    public AbstractApplicationRunner getRunner() {
-      return this.runner;
-    }
-
-    public Config getConfig() {
-      return this.config;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index a9a4026..2f75103 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -27,6 +27,8 @@ import org.apache.samza.Partition;
 import org.apache.samza.system.SystemStreamMetadata;
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.kafka.KafkaSystemAdmin;
+import org.apache.samza.test.framework.BroadcastAssertApp;
+import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index fbc315f..058a690 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
 import org.apache.samza.test.operator.data.PageView;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
deleted file mode 100644
index 94c1eca..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.timer;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.test.operator.data.PageView;
-import org.apache.samza.test.framework.StreamAssert;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-public class TestTimerApp implements StreamApplication {
-  public static final String PAGE_VIEWS = "page-views";
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
-    final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, 
serde);
-    final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapTimerFn());
-
-    StreamAssert.that("Output from timer function should container all 
complete messages", output, serde)
-        .containsInAnyOrder(
-            Arrays.asList(
-                new PageView("v1-complete", "p1", "u1"),
-                new PageView("v2-complete", "p2", "u1"),
-                new PageView("v3-complete", "p1", "u2"),
-                new PageView("v4-complete", "p3", "u2")
-            ));
-  }
-
-  private static class FlatmapTimerFn implements FlatMapFunction<PageView, 
PageView>, TimerFunction<String, PageView> {
-
-    private transient List<PageView> pageViews;
-    private transient TimerRegistry<String> timerRegistry;
-
-    @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
-      this.timerRegistry = timerRegistry;
-      this.pageViews = new ArrayList<>();
-    }
-
-    @Override
-    public Collection<PageView> apply(PageView message) {
-      final PageView pv = new PageView(message.getViewId() + "-complete", 
message.getPageId(), message.getUserId());
-      pageViews.add(pv);
-
-      if (pageViews.size() == 2) {
-        //got all messages for this task
-        final long time = System.currentTimeMillis() + 100;
-        timerRegistry.register("CompleteTimer", time);
-      }
-      return Collections.emptyList();
-    }
-
-    @Override
-    public Collection<PageView> onTimer(String key, long time) {
-      return pageViews;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java 
b/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java
deleted file mode 100644
index 11b3aeb..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.timer;
-
-import org.apache.samza.test.operator.StreamApplicationIntegrationTestHarness;
-import org.junit.Before;
-import org.junit.Test;
-
-
-import static org.apache.samza.test.timer.TestTimerApp.PAGE_VIEWS;
-
-public class TimerTest extends StreamApplicationIntegrationTestHarness {
-
-  @Before
-  public void setup() {
-    // create topics
-    createTopic(PAGE_VIEWS, 2);
-
-    // create events for the following user activity.
-    // userId: (viewId, pageId, (adIds))
-    // u1: (v1, p1, (a1)), (v2, p2, (a3))
-    // u2: (v3, p1, (a1)), (v4, p3, (a5))
-    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
-    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
-    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
-    produceMessage(PAGE_VIEWS, 1, "p3", 
"{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
-
-  }
-
-  @Test
-  public void testJob() {
-    runApplication(new TestTimerApp(), "TimerTest", null);
-  }
-}

Reply via email to