Repository: samza Updated Branches: refs/heads/master 5587ebfec -> 13e26fb45
SAMZA-1759: Stream Assert utilities for low level and high level api for TestFramework Adding utilities and corresponding test for low and high level api Author: Sanil Jain <[email protected]> Reviewers: Shanthoosh Venkataraman <[email protected]> Closes #568 from Sanil15/SAMZA-1759 and squashes the following commits: a4861089 [Sanil Jain] Reverting back travis increase for wait time 876a3a58 [Sanil Jain] Increase travis timeout 9e6482b1 [Sanil Jain] Fixing travis build, removing unused imports 526244e8 [Sanil Jain] Merge branch 'master' into SAMZA-1759 9f489acf [Sanil Jain] Moving tests that use MessageStreamAssert to same package name in test folder to use package private a93e5a14 [Sanil Jain] Marking collection transient to ensure newer api changes work 5e6d3ed1 [Sanil Jain] Making MessageStreamAssert package private a5a521cc [Sanil Jain] Splitting operator assertions outside StreamAssert to MessageStreamAssert, addressing review, renaming utils d1e64180 [Sanil Jain] Cleaning unused imports ff218ff7 [Sanil Jain] Removing contains method for operator level assertios for high level api c5768772 [Sanil Jain] Merge branch 'SAMZA-1759' of https://github.com/Sanil15/samza into SAMZA-1759 c69d1bbb [Sanil Jain] StreamAssert Utilities for Low level and High Level Api, Adding More Test for Low Level api for testing multiple partitions and in mulithreaded mode e3c8e2a5 [Sanil Jain] StreamAssert Utilities for Low level and High Level Api, Adding More Test for Low Level api for testing multiple partitions and in mulithreaded mode Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/13e26fb4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/13e26fb4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/13e26fb4 Branch: refs/heads/master Commit: 13e26fb45b69c2f6f74a970475cb8239157d069e Parents: 5587ebf Author: Sanil Jain <[email protected]> Authored: Wed Aug 1 17:18:21 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Aug 1 17:18:21 2018 -0700 ---------------------------------------------------------------------- .travis.yml | 6 +- .../test/framework/MessageStreamAssert.java | 192 ++++++++++++ .../samza/test/framework/StreamAssert.java | 220 +++++--------- .../apache/samza/test/framework/TestRunner.java | 4 +- .../test/framework/stream/CollectionStream.java | 2 +- .../AsyncStreamTaskIntegrationTest.java | 79 ++++- .../test/framework/BroadcastAssertApp.java | 58 ++++ .../samza/test/framework/MyAsyncStreamTask.java | 3 +- .../samza/test/framework/MyStreamTestTask.java | 3 +- .../StreamApplicationIntegrationTest.java | 9 +- ...StreamApplicationIntegrationTestHarness.java | 302 ++++++++++++++++++ .../framework/StreamTaskIntegrationTest.java | 76 ++++- .../samza/test/framework/TestTimerApp.java | 86 ++++++ .../apache/samza/test/framework/TimerTest.java | 50 +++ .../samza/test/operator/BroadcastAssertApp.java | 59 ---- ...StreamApplicationIntegrationTestHarness.java | 303 ------------------- .../operator/TestRepartitionJoinWindowApp.java | 2 + .../test/operator/TestRepartitionWindowApp.java | 1 + .../apache/samza/test/timer/TestTimerApp.java | 87 ------ .../org/apache/samza/test/timer/TimerTest.java | 51 ---- 20 files changed, 937 insertions(+), 656 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index ef112f2..2a3ae0c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,15 +5,15 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# +# language: java http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java new file mode 100644 index 0000000..1a1c24c --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/MessageStreamAssert.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.ObjectInputStream; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.hamcrest.Matchers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertThat; + +/** + * An assertion on the content of a {@link MessageStream}. + * + * <pre>Example: {@code + * MessageStream<String> stream = streamGraph.getInputStream("input", serde).map(some_function)...; + * ... + * MessageStreamAssert.that(id, stream, stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c")); + * }</pre> + * + */ +@VisibleForTesting +class MessageStreamAssert<M> { + private final static Map<String, CountDownLatch> LATCHES = new ConcurrentHashMap<>(); + private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0); + + private final String id; + private final MessageStream<M> messageStream; + private final Serde<M> serde; + private boolean checkEachTask = false; + + /** + * Constructors a MessageStreamAssert with an id and serde + * @param id unique id + * @param messageStream represents messageStream that you want to assert on + * @param serde serde used to desialize messageStream + * @param <M> represents type of Message + * @return MessageStreamAssert that returns the the messages in the stream + */ + public static <M> MessageStreamAssert<M> that(String id, MessageStream<M> messageStream, Serde<M> serde) { + return new MessageStreamAssert<>(id, messageStream, serde); + } + + private MessageStreamAssert(String id, MessageStream<M> messageStream, Serde<M> serde) { + this.id = id; + this.messageStream = messageStream; + this.serde = serde; + } + + public MessageStreamAssert forEachTask() { + checkEachTask = true; + return this; + } + + public void containsInAnyOrder(final Collection<M> expected) { + LATCHES.putIfAbsent(id, PLACE_HOLDER); + final MessageStream<M> streamToCheck = checkEachTask + ? messageStream + : messageStream + .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), serde), null) + .map(kv -> kv.value); + + streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, checkEachTask)); + } + + public static void waitForComplete() { + try { + while (!LATCHES.isEmpty()) { + final Set<String> ids = new HashSet<>(LATCHES.keySet()); + for (String id : ids) { + while (LATCHES.get(id) == PLACE_HOLDER) { + Thread.sleep(100); + } + + final CountDownLatch latch = LATCHES.get(id); + if (latch != null) { + latch.await(); + LATCHES.remove(id); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final class CheckAgainstExpected<M> implements SinkFunction<M> { + private static final long TIMEOUT = 5000L; + + private final String id; + private final boolean checkEachTask; + private final transient Collection<M> expected; + + + private transient Timer timer = new Timer(); + private transient List<M> actual = Collections.synchronizedList(new ArrayList<>()); + private transient TimerTask timerTask = new TimerTask() { + @Override + public void run() { + check(); + } + }; + + CheckAgainstExpected(String id, Collection<M> expected, boolean checkEachTask) { + this.id = id; + this.expected = expected; + this.checkEachTask = checkEachTask; + } + + @Override + public void init(Config config, TaskContext context) { + final SystemStreamPartition ssp = Iterables.getFirst(context.getSystemStreamPartitions(), null); + if (ssp != null || ssp.getPartition().getPartitionId() == 0) { + final int count = checkEachTask ? context.getSamzaContainerContext().taskNames.size() : 1; + LATCHES.put(id, new CountDownLatch(count)); + timer.schedule(timerTask, TIMEOUT); + } + } + + @Override + public void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) { + actual.add(message); + + if (actual.size() >= expected.size()) { + timerTask.cancel(); + check(); + } + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + timer = new Timer(); + actual = Collections.synchronizedList(new ArrayList<>()); + timerTask = new TimerTask() { + @Override + public void run() { + check(); + } + }; + } + + private void check() { + final CountDownLatch latch = LATCHES.get(id); + try { + assertThat(actual, Matchers.containsInAnyOrder((M[]) expected.toArray())); + throw new IllegalArgumentException("asdas"); + } finally { + latch.countDown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java index a1ac299..9972d7f 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java @@ -19,163 +19,95 @@ package org.apache.samza.test.framework; -import com.google.common.collect.Iterables; -import java.io.IOException; -import java.io.ObjectInputStream; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.hamcrest.Matchers; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; +import com.google.common.base.Preconditions; +import java.time.Duration; +import java.util.stream.Collectors; +import org.apache.samza.test.framework.stream.CollectionStream; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.hamcrest.collection.IsIterableContainingInOrder; import static org.junit.Assert.assertThat; + /** - * An assertion on the content of a {@link MessageStream}. - * - * <pre>Example: {@code - * MessageStream<String> stream = streamGraph.getInputStream("input", serde).map(some_function)...; - * ... - * StreamAssert.that(id, stream, stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c")); - * }</pre> - * + * Assertion utils non the content of a {@link CollectionStream}. */ -public class StreamAssert<M> { - private final static Map<String, CountDownLatch> LATCHES = new ConcurrentHashMap<>(); - private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0); - - private final String id; - private final MessageStream<M> messageStream; - private final Serde<M> serde; - private boolean checkEachTask = false; - - public static <M> StreamAssert<M> that(String id, MessageStream<M> messageStream, Serde<M> serde) { - return new StreamAssert<>(id, messageStream, serde); - } - - private StreamAssert(String id, MessageStream<M> messageStream, Serde<M> serde) { - this.id = id; - this.messageStream = messageStream; - this.serde = serde; - } - - public StreamAssert forEachTask() { - checkEachTask = true; - return this; +public class StreamAssert { + /** + * Util to assert presence of messages in a stream with single partition in any order + * + * @param collectionStream represents the actual stream which will be consumed to compare against expected list + * @param expected represents the expected stream of messages + * @param timeout maximum time to wait for consuming the stream + * @param <M> represents the type of Message in the stream + * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages + */ + public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout) + throws InterruptedException { + Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); + assertThat(TestRunner.consumeStream(collectionStream, timeout) + .entrySet() + .stream() + .flatMap(entry -> entry.getValue().stream()) + .collect(Collectors.toList()), IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray())); } - public void containsInAnyOrder(final Collection<M> expected) { - LATCHES.putIfAbsent(id, PLACE_HOLDER); - final MessageStream<M> streamToCheck = checkEachTask - ? messageStream - : messageStream - .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), serde), null) - .map(kv -> kv.value); - - streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, checkEachTask)); - } - - public static void waitForComplete() { - try { - while (!LATCHES.isEmpty()) { - final Set<String> ids = new HashSet<>(LATCHES.keySet()); - for (String id : ids) { - while (LATCHES.get(id) == PLACE_HOLDER) { - Thread.sleep(100); - } - - final CountDownLatch latch = LATCHES.get(id); - if (latch != null) { - latch.await(); - LATCHES.remove(id); - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); + /** + * Util to assert presence of messages in a stream with multiple partition in any order + * + * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map + * @param expected represents a map of partitionId as key and list of messages in stream as value + * @param timeout maximum time to wait for consuming the stream + * @param <M> represents the type of Message in the stream + * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages + * + */ + public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected, + Duration timeout) throws InterruptedException { + Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); + Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout); + for (Integer paritionId : expected.keySet()) { + assertThat(actual.get(paritionId), + IsIterableContainingInAnyOrder.containsInAnyOrder(expected.get(paritionId).toArray())); } } - private static final class CheckAgainstExpected<M> implements SinkFunction<M> { - private static final long TIMEOUT = 5000L; - - private final String id; - private final boolean checkEachTask; - private final Collection<M> expected; - - - private transient Timer timer = new Timer(); - private transient List<M> actual = Collections.synchronizedList(new ArrayList<>()); - private transient TimerTask timerTask = new TimerTask() { - @Override - public void run() { - check(); - } - }; - - CheckAgainstExpected(String id, Collection<M> expected, boolean checkEachTask) { - this.id = id; - this.expected = expected; - this.checkEachTask = checkEachTask; - } - - @Override - public void init(Config config, TaskContext context) { - final SystemStreamPartition ssp = Iterables.getFirst(context.getSystemStreamPartitions(), null); - if (ssp == null ? false : ssp.getPartition().getPartitionId() == 0) { - final int count = checkEachTask ? context.getSamzaContainerContext().taskNames.size() : 1; - LATCHES.put(id, new CountDownLatch(count)); - timer.schedule(timerTask, TIMEOUT); - } - } - - @Override - public void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) { - actual.add(message); - - if (actual.size() >= expected.size()) { - timerTask.cancel(); - check(); - } - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - timer = new Timer(); - actual = Collections.synchronizedList(new ArrayList<>()); - timerTask = new TimerTask() { - @Override - public void run() { - check(); - } - }; - } + /** + * Util to assert ordering of messages in a stream with single partition + * + * @param collectionStream represents the actual stream which will be consumed to compare against expected list + * @param expected represents the expected stream of messages + * @param timeout maximum time to wait for consuming the stream + * @param <M> represents the type of Message in the stream + * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages + */ + public static <M> void containsInOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout) + throws InterruptedException { + Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); + assertThat(TestRunner.consumeStream(collectionStream, timeout) + .entrySet() + .stream() + .flatMap(entry -> entry.getValue().stream()) + .collect(Collectors.toList()), IsIterableContainingInOrder.contains(expected.toArray())); + } - private void check() { - final CountDownLatch latch = LATCHES.get(id); - try { - assertThat(actual, Matchers.containsInAnyOrder((M[]) expected.toArray())); - } finally { - latch.countDown(); - } + /** + * Util to assert ordering of messages in a multi-partitioned stream + * + * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map + * @param expected represents a map of partitionId as key and list of messages as value + * @param timeout maximum time to wait for consuming the stream + * @param <M> represents the type of Message in the stream + * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages + */ + public static <M> void containsInOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected, + Duration timeout) throws InterruptedException { + Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); + Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout); + for (Integer paritionId : expected.keySet()) { + assertThat(actual.get(paritionId), IsIterableContainingInOrder.contains(expected.get(paritionId).toArray())); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 6e647d9..3c45967 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -311,7 +311,7 @@ public class TestRunner { * i.e messages in the partition * @throws InterruptedException Thrown when a blocking poll has been interrupted by another thread. */ - public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Integer timeout) throws InterruptedException { + public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Duration timeout) throws InterruptedException { Preconditions.checkNotNull(stream); Preconditions.checkNotNull(stream.getSystemName()); String streamName = stream.getStreamName(); @@ -334,7 +334,7 @@ public class TestRunner { long t = System.currentTimeMillis(); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>(); HashSet<SystemStreamPartition> didNotReachEndOfStream = new HashSet<>(ssps); - while (System.currentTimeMillis() < t + timeout) { + while (System.currentTimeMillis() < t + timeout.toMillis()) { Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = consumer.poll(ssps, 10); for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry : currentState.entrySet()) { SystemStreamPartition ssp = entry.getKey(); http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java index b3d9485..320a0ac 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java @@ -104,7 +104,7 @@ public class CollectionStream<T> { private CollectionStream(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> initPartitions) { this(systemName, streamName); Preconditions.checkNotNull(initPartitions); - initPartitions = new HashMap<>(initPartitions); + this.initPartitions = new HashMap<>(initPartitions); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index ad25cae..3a1eba0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -20,8 +20,13 @@ package org.apache.samza.test.framework; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.samza.operators.KV; import org.apache.samza.test.framework.stream.CollectionStream; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Assert; @@ -44,10 +49,82 @@ public class AsyncStreamTaskIntegrationTest { .addOutputStream(output) .run(Duration.ofSeconds(2)); - Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0), + Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0), IsIterableContainingInOrder.contains(outputList.toArray())); } + @Test + public void testAsyncTaskWithSinglePartitionUsingStreamAssert() throws Exception { + List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputList = Arrays.asList(50, 10, 20, 30, 40); + + CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList); + CollectionStream output = CollectionStream.empty("async-test", "ints-out"); + + TestRunner + .of(MyAsyncStreamTask.class) + .addInputStream(input) + .addOutputStream(output) + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInAnyOrder(output, outputList, Duration.ofMillis(1000)); + } + + @Test + public void testAsyncTaskWithMultiplePartition() throws Exception { + Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); + Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); + List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); + for (int i = 0; i < 5; i++) { + List<KV> keyedPartition = new ArrayList<>(); + for (Integer val : partition) { + keyedPartition.add(KV.of(i, val)); + } + inputPartitionData.put(i, keyedPartition); + expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); + } + + CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData); + CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5); + + TestRunner + .of(MyAsyncStreamTask.class) + .addInputStream(inputStream) + .addOutputStream(outputStream) + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); + } + + @Test + public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception { + Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); + Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); + List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); + for (int i = 0; i < 5; i++) { + List<KV> keyedPartition = new ArrayList<>(); + for (Integer val : partition) { + keyedPartition.add(KV.of(i, val)); + } + inputPartitionData.put(i, keyedPartition); + expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); + } + + CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData); + CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5); + + TestRunner + .of(MyAsyncStreamTask.class) + .addInputStream(inputStream) + .addOutputStream(outputStream) + .addOverrideConfig("task.max.concurrency", "4") + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInAnyOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); + } + /** * Job should fail because it times out too soon */ http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java new file mode 100644 index 0000000..ec52aa4 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.test.operator.data.PageView; + +import java.util.Arrays; + +public class BroadcastAssertApp implements StreamApplication { + + public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName"; + + + @Override + public void init(StreamGraph graph, Config config) { + String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); + + final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); + final MessageStream<PageView> broadcastPageViews = graph + .getInputStream(inputTopic, serde) + .broadcast(serde, "pv"); + + /** + * Each task will see all the pageview events + */ + MessageStreamAssert.that("Each task contains all broadcast PageView events", broadcastPageViews, serde) + .forEachTask() + .containsInAnyOrder( + Arrays.asList( + new PageView("v1", "p1", "u1"), + new PageView("v2", "p2", "u1"), + new PageView("v3", "p1", "u2"), + new PageView("v4", "p3", "u2") + )); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java index 347e766..4ecb4b6 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java @@ -60,7 +60,8 @@ class RestCall extends Thread { System.out.println("Thread " + this.getName() + " interrupted."); } Integer obj = (Integer) envelope.getMessage(); - messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("async-test", "ints-out"), obj * 10)); + messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("async-test", "ints-out"), + envelope.getKey(), envelope.getKey(), obj * 10)); callback.complete(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java index c83e461..a07fe74 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java @@ -32,6 +32,7 @@ public class MyStreamTestTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); - collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"), obj * 10)); + collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"), + envelope.getKey(), envelope.getKey(), obj * 10)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 8ac40e1..ba4c985 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -20,7 +20,9 @@ package org.apache.samza.test.framework; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; @@ -60,10 +62,12 @@ public class StreamApplicationIntegrationTest { Random random = new Random(); int count = 10; List<PageView> pageviews = new ArrayList<>(count); + Map<Integer, List<PageView>> expectedOutput = new HashMap<>(); for (int i = 0; i < count; i++) { String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; int memberId = i; - pageviews.add(new PageView(pagekey, memberId)); + PageView pv = new PageView(pagekey, memberId); + pageviews.add(pv); } CollectionStream<PageView> input = CollectionStream.of("test", "PageView", pageviews); @@ -76,7 +80,7 @@ public class StreamApplicationIntegrationTest { .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1500)); - Assert.assertEquals(TestRunner.consumeStream(output, 10000).get(random.nextInt(count)).size(), 1); + Assert.assertEquals(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1); } public static final class Values { @@ -124,4 +128,5 @@ public class StreamApplicationIntegrationTest { .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1000)); } + } http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java new file mode 100644 index 0000000..810d2c2 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.framework; + +import kafka.utils.TestUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.execution.TestStreamManager; +import org.apache.samza.runtime.AbstractApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.kafka.KafkaSystemAdmin; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import scala.Option; +import scala.Option$; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Harness for writing integration tests for {@link StreamApplication}s. + * + * <p> This provides the following features for its sub-classes: + * <ul> + * <li> + * Automatic Setup and teardown: Any non-trivial integration test brings up components like Zookeeper + * servers, Kafka brokers, Kafka producers and Kafka consumers. This harness initializes each + * of these components in {@link #setUp()} and shuts down each of them cleanly in {@link #tearDown()}. + * {@link #setUp()} and {@link #tearDown()} are automatically invoked from the Junit runner. + * <li> + * Interaction with Kafka: The harness provides convenience methods to interact with Kafka brokers, consumers + * and producers - for instance, methods to create topics, produce and consume messages. + * <li> + * Config defaults: Often Samza integration tests have to setup config boiler plate + * to perform even simple tasks like producing and consuming messages. This harness provides default string + * serdes for producing / consuming messages and a default system-alias named "kafka" that uses the + * {@link org.apache.samza.system.kafka.KafkaSystemFactory}. + * <li> + * Debugging: At times, it is convenient to debug integration tests locally from an IDE. This harness + * runs all its components (including Kafka brokers, Zookeeper servers and Samza) locally. + * </ul> + * + * <p> <i> Implementation Notes: </i> <br/> + * State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and + * Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()} <br/> + * + * Execution model: {@link StreamApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. + * Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads. + * {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls. + * + * <h3>Usage Example</h3> + * Here is an actual test that publishes a message into Kafka, runs an application, and verifies consumption + * from the output topic. + * + * <pre> {@code + * class MyTest extends StreamApplicationIntegrationTestHarness { + * private final StreamApplication myApp = new MyStreamApplication(); + * private final Collection<String> outputTopics = Collections.singletonList("output-topic"); + * @Test + * public void test() { + * createTopic("mytopic", 1); + * produceMessage("mytopic", 0, "key1", "val1"); + * runApplication(myApp, "myApp", null); + * List<ConsumerRecord<String, String>> messages = consumeMessages(outputTopics) + * Assert.assertEquals(messages.size(), 1); + * } + * }}</pre> + */ +public class StreamApplicationIntegrationTestHarness extends AbstractIntegrationTestHarness { + private KafkaProducer producer; + private KafkaConsumer consumer; + protected KafkaSystemAdmin systemAdmin; + + private int numEmptyPolls = 3; + private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20); + private static final String DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + + /** + * Starts a single kafka broker, and a single embedded zookeeper server in their own threads. + * Sub-classes should invoke {@link #zkConnect()} and {@link #bootstrapUrl()}s to + * obtain the urls (and ports) of the started zookeeper and kafka broker. + */ + @Override + public void setUp() { + super.setUp(); + + Properties consumerDeserializerProperties = new Properties(); + consumerDeserializerProperties.setProperty("key.deserializer", DEFAULT_DESERIALIZER); + consumerDeserializerProperties.setProperty("value.deserializer", DEFAULT_DESERIALIZER); + + producer = TestUtils.createNewProducer( + bootstrapServers(), // bootstrap-server url + 1, // acks + 60 * 1000L, // maxBlockMs + 1024L * 1024L, // buffer size + 0, // numRetries + 0L, // lingerMs + 5 * 1000L, // requestTimeout + SecurityProtocol.PLAINTEXT, + null, + Option.apply(new Properties()), + new StringSerializer(), + new StringSerializer(), + Option.apply(new Properties())); + + consumer = TestUtils.createNewConsumer( + bootstrapServers(), + "group", // groupId + "earliest", // auto-offset-reset + 4096L, // per-partition fetch size + "org.apache.kafka.clients.consumer.RangeAssignor", // partition Assigner + 30000, + SecurityProtocol.PLAINTEXT, + Option$.MODULE$.<File>empty(), + Option$.MODULE$.<Properties>empty(), + Option$.MODULE$.<Properties>apply(consumerDeserializerProperties)); + + systemAdmin = createSystemAdmin("kafka"); + systemAdmin.start(); + } + + /** + * Creates a kafka topic with the provided name and the number of partitions + * @param topicName the name of the topic + * @param numPartitions the number of partitions in the topic + */ + public void createTopic(String topicName, int numPartitions) { + TestUtils.createTopic(zkUtils(), topicName, numPartitions, 1, servers(), new Properties()); + } + + /** + * Produces a message to the provided topic partition. + * @param topicName the topic to produce messages to + * @param partitionId the topic partition to produce messages to + * @param key the key in the message + * @param val the value in the message + */ + public void produceMessage(String topicName, int partitionId, String key, String val) { + producer.send(new ProducerRecord(topicName, partitionId, key, val)); + producer.flush(); + } + + @Override + public int clusterSize() { + return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()); + } + + + /** + * Read messages from the provided list of topics until {@param threshold} messages have been read or until + * {@link #numEmptyPolls} polls return no messages. + * + * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are + * determined by {@link #numEmptyPolls} + * + * @param topics the list of topics to consume from + * @param threshold the number of messages to consume + * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold} + */ + public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) { + int emptyPollCount = 0; + List<ConsumerRecord<String, String>> recordList = new ArrayList<>(); + consumer.subscribe(topics); + + while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) { + ConsumerRecords<String, String> records = consumer.poll(POLL_TIMEOUT_MS.toMillis()); + if (!records.isEmpty()) { + Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); + while (iterator.hasNext() && recordList.size() < threshold) { + ConsumerRecord record = iterator.next(); + recordList.add(record); + emptyPollCount = 0; + } + } else { + emptyPollCount++; + } + } + return recordList; + } + + /** + * Executes the provided {@link StreamApplication} as a {@link org.apache.samza.job.local.ThreadJob}. The + * {@link StreamApplication} runs in its own separate thread. + * + * @param streamApplication the application to run + * @param appName the name of the application + * @param overriddenConfigs configs to override + * @return RunApplicationContext which contains objects created within runApplication, to be used for verification + * if necessary + */ + protected RunApplicationContext runApplication(StreamApplication streamApplication, + String appName, + Map<String, String> overriddenConfigs) { + Map<String, String> configMap = new HashMap<>(); + configMap.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory"); + configMap.put("job.name", appName); + configMap.put("app.class", streamApplication.getClass().getCanonicalName()); + configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory"); + configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory"); + configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configMap.put("systems.kafka.samza.key.serde", "string"); + configMap.put("systems.kafka.samza.msg.serde", "string"); + configMap.put("systems.kafka.samza.offset.default", "oldest"); + configMap.put("job.coordinator.system", "kafka"); + configMap.put("job.default.system", "kafka"); + configMap.put("job.coordinator.replication.factor", "1"); + configMap.put("task.window.ms", "1000"); + configMap.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName()); + + // This is to prevent tests from taking a long time to stop after they're done. The issue is that + // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately. + // The test process then exits, triggering the SamzaContainer shutdown hook, which in turn tries to flush any + // store changelogs, which then get stuck trying to produce to the stopped Kafka server. + // Calling runner.kill doesn't work since RemoteApplicationRunner creates a new ThreadJob instance when + // kill is called. We can't use LocalApplicationRunner since ZkJobCoordinator doesn't currently create + // changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK + // since the test method has already executed by the time the shutdown hook is called. The side effect is + // that buffered state (e.g. changelog contents) might not be flushed correctly after the test run. + configMap.put("task.shutdown.ms", "1"); + + if (overriddenConfigs != null) { + configMap.putAll(overriddenConfigs); + } + + Config config = new MapConfig(configMap); + AbstractApplicationRunner runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(config); + runner.run(streamApplication); + + MessageStreamAssert.waitForComplete(); + return new RunApplicationContext(runner, config); + } + + public void setNumEmptyPolls(int numEmptyPolls) { + this.numEmptyPolls = numEmptyPolls; + } + + /** + * Shutdown and clear Zookeeper and Kafka broker state. + */ + @Override + public void tearDown() { + systemAdmin.stop(); + producer.close(); + consumer.close(); + super.tearDown(); + } + + /** + * Container for any necessary context created during runApplication. Allows tests to access objects created within + * runApplication in order to do verification. + */ + protected static class RunApplicationContext { + private final AbstractApplicationRunner runner; + private final Config config; + + private RunApplicationContext(AbstractApplicationRunner runner, Config config) { + this.runner = runner; + this.config = config; + } + + public AbstractApplicationRunner getRunner() { + return this.runner; + } + + public Config getConfig() { + return this.config; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index 2cc5977..f888b4a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -20,14 +20,20 @@ package org.apache.samza.test.framework; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; +import org.apache.samza.operators.KV; import org.apache.samza.test.framework.stream.CollectionStream; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Assert; import org.junit.Test; + public class StreamTaskIntegrationTest { @Test @@ -40,7 +46,7 @@ public class StreamTaskIntegrationTest { TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1)); - Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0), + Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0), IsIterableContainingInOrder.contains(outputList.toArray())); } @@ -54,11 +60,79 @@ public class StreamTaskIntegrationTest { CollectionStream<Double> input = CollectionStream.of("test", "doubles", inputList); CollectionStream output = CollectionStream.empty("test", "output"); + TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1)); + } + + @Test + public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception { + List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); + + CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList); + CollectionStream output = CollectionStream.empty("test", "output"); + TestRunner .of(MyStreamTestTask.class) .addInputStream(input) .addOutputStream(output) + .addOverrideConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(1)); + + StreamAssert.containsInOrder(output, outputList, Duration.ofMillis(1000)); } + @Test + public void testSyncTaskWithMultiplePartition() throws Exception { + Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); + Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); + List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); + for (int i = 0; i < 5; i++) { + List<KV> keyedPartition = new ArrayList<>(); + for (Integer val : partition) { + keyedPartition.add(KV.of(i, val)); + } + inputPartitionData.put(i, keyedPartition); + expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); + } + + CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData); + CollectionStream outputStream = CollectionStream.empty("test", "output", 5); + + TestRunner + .of(MyStreamTestTask.class) + .addInputStream(inputStream) + .addOutputStream(outputStream) + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); + } + + @Test + public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception { + Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); + Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); + List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); + for (int i = 0; i < 5; i++) { + List<KV> keyedPartition = new ArrayList<>(); + for (Integer val : partition) { + keyedPartition.add(KV.of(i, val)); + } + inputPartitionData.put(i, keyedPartition); + expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); + } + + CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData); + CollectionStream outputStream = CollectionStream.empty("test", "output", 5); + + TestRunner + .of(MyStreamTestTask.class) + .addInputStream(inputStream) + .addOutputStream(outputStream) + .addOverrideConfig("job.container.thread.pool.size", "4") + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java new file mode 100644 index 0000000..09da838 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.test.operator.data.PageView; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class TestTimerApp implements StreamApplication { + public static final String PAGE_VIEWS = "page-views"; + + @Override + public void init(StreamGraph graph, Config config) { + final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); + final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, serde); + final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn()); + + MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde) + .containsInAnyOrder( + Arrays.asList( + new PageView("v1-complete", "p1", "u1"), + new PageView("v2-complete", "p2", "u1"), + new PageView("v3-complete", "p1", "u2"), + new PageView("v4-complete", "p3", "u2") + )); + } + + private static class FlatmapTimerFn implements FlatMapFunction<PageView, PageView>, TimerFunction<String, PageView> { + + private transient List<PageView> pageViews; + private transient TimerRegistry<String> timerRegistry; + + @Override + public void registerTimer(TimerRegistry<String> timerRegistry) { + this.timerRegistry = timerRegistry; + this.pageViews = new ArrayList<>(); + } + + @Override + public Collection<PageView> apply(PageView message) { + final PageView pv = new PageView(message.getViewId() + "-complete", message.getPageId(), message.getUserId()); + pageViews.add(pv); + + if (pageViews.size() == 2) { + //got all messages for this task + final long time = System.currentTimeMillis() + 100; + timerRegistry.register("CompleteTimer", time); + } + return Collections.emptyList(); + } + + @Override + public Collection<PageView> onTimer(String key, long time) { + return pageViews; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java new file mode 100644 index 0000000..a48409c --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import org.junit.Before; +import org.junit.Test; + + +import static org.apache.samza.test.framework.TestTimerApp.PAGE_VIEWS; + +public class TimerTest extends StreamApplicationIntegrationTestHarness { + + @Before + public void setup() { + // create topics + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}"); + produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}"); + + } + + @Test + public void testJob() { + runApplication(new TestTimerApp(), "TimerTest", null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java deleted file mode 100644 index 88e2765..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.operator; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.test.operator.data.PageView; -import org.apache.samza.test.framework.StreamAssert; - -import java.util.Arrays; - -public class BroadcastAssertApp implements StreamApplication { - - public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName"; - - - @Override - public void init(StreamGraph graph, Config config) { - String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); - - final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); - final MessageStream<PageView> broadcastPageViews = graph - .getInputStream(inputTopic, serde) - .broadcast(serde, "pv"); - - /** - * Each task will see all the pageview events - */ - StreamAssert.that("Each task contains all broadcast PageView events", broadcastPageViews, serde) - .forEachTask() - .containsInAnyOrder( - Arrays.asList( - new PageView("v1", "p1", "u1"), - new PageView("v2", "p2", "u1"), - new PageView("v3", "p1", "u2"), - new PageView("v4", "p3", "u2") - )); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java deleted file mode 100644 index 3c11533..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.test.operator; - -import kafka.utils.TestUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.config.KafkaConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.execution.TestStreamManager; -import org.apache.samza.runtime.AbstractApplicationRunner; -import org.apache.samza.runtime.ApplicationRunner; -import org.apache.samza.system.kafka.KafkaSystemAdmin; -import org.apache.samza.test.harness.AbstractIntegrationTestHarness; -import org.apache.samza.test.framework.StreamAssert; -import scala.Option; -import scala.Option$; - -import java.io.File; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * Harness for writing integration tests for {@link StreamApplication}s. - * - * <p> This provides the following features for its sub-classes: - * <ul> - * <li> - * Automatic Setup and teardown: Any non-trivial integration test brings up components like Zookeeper - * servers, Kafka brokers, Kafka producers and Kafka consumers. This harness initializes each - * of these components in {@link #setUp()} and shuts down each of them cleanly in {@link #tearDown()}. - * {@link #setUp()} and {@link #tearDown()} are automatically invoked from the Junit runner. - * <li> - * Interaction with Kafka: The harness provides convenience methods to interact with Kafka brokers, consumers - * and producers - for instance, methods to create topics, produce and consume messages. - * <li> - * Config defaults: Often Samza integration tests have to setup config boiler plate - * to perform even simple tasks like producing and consuming messages. This harness provides default string - * serdes for producing / consuming messages and a default system-alias named "kafka" that uses the - * {@link org.apache.samza.system.kafka.KafkaSystemFactory}. - * <li> - * Debugging: At times, it is convenient to debug integration tests locally from an IDE. This harness - * runs all its components (including Kafka brokers, Zookeeper servers and Samza) locally. - * </ul> - * - * <p> <i> Implementation Notes: </i> <br/> - * State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and - * Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()} <br/> - * - * Execution model: {@link StreamApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. - * Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads. - * {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls. - * - * <h3>Usage Example</h3> - * Here is an actual test that publishes a message into Kafka, runs an application, and verifies consumption - * from the output topic. - * - * <pre> {@code - * class MyTest extends StreamApplicationIntegrationTestHarness { - * private final StreamApplication myApp = new MyStreamApplication(); - * private final Collection<String> outputTopics = Collections.singletonList("output-topic"); - * @Test - * public void test() { - * createTopic("mytopic", 1); - * produceMessage("mytopic", 0, "key1", "val1"); - * runApplication(myApp, "myApp", null); - * List<ConsumerRecord<String, String>> messages = consumeMessages(outputTopics) - * Assert.assertEquals(messages.size(), 1); - * } - * }}</pre> - */ -public class StreamApplicationIntegrationTestHarness extends AbstractIntegrationTestHarness { - private KafkaProducer producer; - private KafkaConsumer consumer; - protected KafkaSystemAdmin systemAdmin; - - private int numEmptyPolls = 3; - private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20); - private static final String DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - - /** - * Starts a single kafka broker, and a single embedded zookeeper server in their own threads. - * Sub-classes should invoke {@link #zkConnect()} and {@link #bootstrapUrl()}s to - * obtain the urls (and ports) of the started zookeeper and kafka broker. - */ - @Override - public void setUp() { - super.setUp(); - - Properties consumerDeserializerProperties = new Properties(); - consumerDeserializerProperties.setProperty("key.deserializer", DEFAULT_DESERIALIZER); - consumerDeserializerProperties.setProperty("value.deserializer", DEFAULT_DESERIALIZER); - - producer = TestUtils.createNewProducer( - bootstrapServers(), // bootstrap-server url - 1, // acks - 60 * 1000L, // maxBlockMs - 1024L * 1024L, // buffer size - 0, // numRetries - 0L, // lingerMs - 5 * 1000L, // requestTimeout - SecurityProtocol.PLAINTEXT, - null, - Option.apply(new Properties()), - new StringSerializer(), - new StringSerializer(), - Option.apply(new Properties())); - - consumer = TestUtils.createNewConsumer( - bootstrapServers(), - "group", // groupId - "earliest", // auto-offset-reset - 4096L, // per-partition fetch size - "org.apache.kafka.clients.consumer.RangeAssignor", // partition Assigner - 30000, - SecurityProtocol.PLAINTEXT, - Option$.MODULE$.<File>empty(), - Option$.MODULE$.<Properties>empty(), - Option$.MODULE$.<Properties>apply(consumerDeserializerProperties)); - - systemAdmin = createSystemAdmin("kafka"); - systemAdmin.start(); - } - - /** - * Creates a kafka topic with the provided name and the number of partitions - * @param topicName the name of the topic - * @param numPartitions the number of partitions in the topic - */ - public void createTopic(String topicName, int numPartitions) { - TestUtils.createTopic(zkUtils(), topicName, numPartitions, 1, servers(), new Properties()); - } - - /** - * Produces a message to the provided topic partition. - * @param topicName the topic to produce messages to - * @param partitionId the topic partition to produce messages to - * @param key the key in the message - * @param val the value in the message - */ - public void produceMessage(String topicName, int partitionId, String key, String val) { - producer.send(new ProducerRecord(topicName, partitionId, key, val)); - producer.flush(); - } - - @Override - public int clusterSize() { - return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()); - } - - - /** - * Read messages from the provided list of topics until {@param threshold} messages have been read or until - * {@link #numEmptyPolls} polls return no messages. - * - * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are - * determined by {@link #numEmptyPolls} - * - * @param topics the list of topics to consume from - * @param threshold the number of messages to consume - * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold} - */ - public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) { - int emptyPollCount = 0; - List<ConsumerRecord<String, String>> recordList = new ArrayList<>(); - consumer.subscribe(topics); - - while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) { - ConsumerRecords<String, String> records = consumer.poll(POLL_TIMEOUT_MS.toMillis()); - if (!records.isEmpty()) { - Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); - while (iterator.hasNext() && recordList.size() < threshold) { - ConsumerRecord record = iterator.next(); - recordList.add(record); - emptyPollCount = 0; - } - } else { - emptyPollCount++; - } - } - return recordList; - } - - /** - * Executes the provided {@link StreamApplication} as a {@link org.apache.samza.job.local.ThreadJob}. The - * {@link StreamApplication} runs in its own separate thread. - * - * @param streamApplication the application to run - * @param appName the name of the application - * @param overriddenConfigs configs to override - * @return RunApplicationContext which contains objects created within runApplication, to be used for verification - * if necessary - */ - protected RunApplicationContext runApplication(StreamApplication streamApplication, - String appName, - Map<String, String> overriddenConfigs) { - Map<String, String> configMap = new HashMap<>(); - configMap.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory"); - configMap.put("job.name", appName); - configMap.put("app.class", streamApplication.getClass().getCanonicalName()); - configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory"); - configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory"); - configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); - configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); - configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); - configMap.put("systems.kafka.samza.key.serde", "string"); - configMap.put("systems.kafka.samza.msg.serde", "string"); - configMap.put("systems.kafka.samza.offset.default", "oldest"); - configMap.put("job.coordinator.system", "kafka"); - configMap.put("job.default.system", "kafka"); - configMap.put("job.coordinator.replication.factor", "1"); - configMap.put("task.window.ms", "1000"); - configMap.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName()); - - // This is to prevent tests from taking a long time to stop after they're done. The issue is that - // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately. - // The test process then exits, triggering the SamzaContainer shutdown hook, which in turn tries to flush any - // store changelogs, which then get stuck trying to produce to the stopped Kafka server. - // Calling runner.kill doesn't work since RemoteApplicationRunner creates a new ThreadJob instance when - // kill is called. We can't use LocalApplicationRunner since ZkJobCoordinator doesn't currently create - // changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK - // since the test method has already executed by the time the shutdown hook is called. The side effect is - // that buffered state (e.g. changelog contents) might not be flushed correctly after the test run. - configMap.put("task.shutdown.ms", "1"); - - if (overriddenConfigs != null) { - configMap.putAll(overriddenConfigs); - } - - Config config = new MapConfig(configMap); - AbstractApplicationRunner runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(config); - runner.run(streamApplication); - - StreamAssert.waitForComplete(); - return new RunApplicationContext(runner, config); - } - - public void setNumEmptyPolls(int numEmptyPolls) { - this.numEmptyPolls = numEmptyPolls; - } - - /** - * Shutdown and clear Zookeeper and Kafka broker state. - */ - @Override - public void tearDown() { - systemAdmin.stop(); - producer.close(); - consumer.close(); - super.tearDown(); - } - - /** - * Container for any necessary context created during runApplication. Allows tests to access objects created within - * runApplication in order to do verification. - */ - protected static class RunApplicationContext { - private final AbstractApplicationRunner runner; - private final Config config; - - private RunApplicationContext(AbstractApplicationRunner runner, Config config) { - this.runner = runner; - this.config = config; - } - - public AbstractApplicationRunner getRunner() { - return this.runner; - } - - public Config getConfig() { - return this.config; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index a9a4026..2f75103 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -27,6 +27,8 @@ import org.apache.samza.Partition; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; import org.apache.samza.system.kafka.KafkaSystemAdmin; +import org.apache.samza.test.framework.BroadcastAssertApp; +import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness; import org.apache.samza.util.ExponentialSleepStrategy; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java index fbc315f..058a690 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness; import org.apache.samza.test.operator.data.PageView; import org.codehaus.jackson.map.ObjectMapper; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java deleted file mode 100644 index 94c1eca..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.timer; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.test.operator.data.PageView; -import org.apache.samza.test.framework.StreamAssert; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -public class TestTimerApp implements StreamApplication { - public static final String PAGE_VIEWS = "page-views"; - - @Override - public void init(StreamGraph graph, Config config) { - final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); - final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, serde); - final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn()); - - StreamAssert.that("Output from timer function should container all complete messages", output, serde) - .containsInAnyOrder( - Arrays.asList( - new PageView("v1-complete", "p1", "u1"), - new PageView("v2-complete", "p2", "u1"), - new PageView("v3-complete", "p1", "u2"), - new PageView("v4-complete", "p3", "u2") - )); - } - - private static class FlatmapTimerFn implements FlatMapFunction<PageView, PageView>, TimerFunction<String, PageView> { - - private transient List<PageView> pageViews; - private transient TimerRegistry<String> timerRegistry; - - @Override - public void registerTimer(TimerRegistry<String> timerRegistry) { - this.timerRegistry = timerRegistry; - this.pageViews = new ArrayList<>(); - } - - @Override - public Collection<PageView> apply(PageView message) { - final PageView pv = new PageView(message.getViewId() + "-complete", message.getPageId(), message.getUserId()); - pageViews.add(pv); - - if (pageViews.size() == 2) { - //got all messages for this task - final long time = System.currentTimeMillis() + 100; - timerRegistry.register("CompleteTimer", time); - } - return Collections.emptyList(); - } - - @Override - public Collection<PageView> onTimer(String key, long time) { - return pageViews; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/13e26fb4/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java deleted file mode 100644 index 11b3aeb..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.timer; - -import org.apache.samza.test.operator.StreamApplicationIntegrationTestHarness; -import org.junit.Before; -import org.junit.Test; - - -import static org.apache.samza.test.timer.TestTimerApp.PAGE_VIEWS; - -public class TimerTest extends StreamApplicationIntegrationTestHarness { - - @Before - public void setup() { - // create topics - createTopic(PAGE_VIEWS, 2); - - // create events for the following user activity. - // userId: (viewId, pageId, (adIds)) - // u1: (v1, p1, (a1)), (v2, p2, (a3)) - // u2: (v3, p1, (a1)), (v4, p3, (a5)) - produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); - produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); - produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}"); - produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}"); - - } - - @Test - public void testJob() { - runApplication(new TestTimerApp(), "TimerTest", null); - } -}
