SAMZA-1840: Refactor TestRunner Apis to use StreamDescriptor and SystemDescriptor
CollectionStream -> InMemoryInputDescriptor & InMemoryOutputDescriptor CollectionStreamSystemSpec -> InMemorySystemDescriptor Author: Sanil Jain <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Cameron Lee <[email protected]> Closes #634 from Sanil15/SAMZA-1840 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1755268c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1755268c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1755268c Branch: refs/heads/NewKafkaSystemConsumer Commit: 1755268cff201663a41ca06e9dcc4602d41fc306 Parents: 3bb24c8 Author: Sanil Jain <[email protected]> Authored: Wed Sep 19 12:21:12 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Sep 19 12:21:12 2018 -0700 ---------------------------------------------------------------------- .../samza/example/PageViewCounterExample.java | 1 - .../samza/test/framework/StreamAssert.java | 73 ++--- .../apache/samza/test/framework/TestRunner.java | 286 +++++++++---------- .../test/framework/stream/CollectionStream.java | 204 ------------- .../system/CollectionStreamSystemSpec.java | 90 ------ .../system/InMemoryInputDescriptor.java | 42 +++ .../system/InMemoryOutputDescriptor.java | 46 +++ .../system/InMemorySystemDescriptor.java | 118 ++++++++ .../AsyncStreamTaskIntegrationTest.java | 108 ++++--- .../StreamApplicationIntegrationTest.java | 45 ++- .../framework/StreamTaskIntegrationTest.java | 112 +++++--- .../table/TestLocalTableWithSideInputs.java | 32 ++- 12 files changed, 573 insertions(+), 584 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java index b540585..e2ebc93 100644 --- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java @@ -53,7 +53,6 @@ public class PageViewCounterExample implements StreamApplication { Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); PageViewCounterExample app = new PageViewCounterExample(); ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config); - runner.run(); runner.waitForFinish(); } http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java index 9972d7f..42379f3 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java @@ -22,9 +22,9 @@ package org.apache.samza.test.framework; import com.google.common.base.Preconditions; import java.time.Duration; import java.util.stream.Collectors; -import org.apache.samza.test.framework.stream.CollectionStream; import java.util.List; import java.util.Map; +import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.hamcrest.collection.IsIterableContainingInOrder; @@ -32,22 +32,24 @@ import static org.junit.Assert.assertThat; /** - * Assertion utils non the content of a {@link CollectionStream}. + * Assertion utils on the content of a stream described by + * {@link org.apache.samza.operators.descriptors.base.stream.StreamDescriptor}. */ public class StreamAssert { /** - * Util to assert presence of messages in a stream with single partition in any order + * Verifies that the {@code expected} messages are present in any order in the single partition stream + * represented by {@code outputDescriptor} * - * @param collectionStream represents the actual stream which will be consumed to compare against expected list - * @param expected represents the expected stream of messages + * @param expected expected stream of messages + * @param outputDescriptor describes the stream which will be consumed to compare against expected list * @param timeout maximum time to wait for consuming the stream - * @param <M> represents the type of Message in the stream + * @param <StreamMessageType> type of messages in the stream * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages */ - public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout) - throws InterruptedException { - Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); - assertThat(TestRunner.consumeStream(collectionStream, timeout) + public static <StreamMessageType> void containsInAnyOrder(List<StreamMessageType> expected, + InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException { + Preconditions.checkNotNull(outputDescriptor); + assertThat(TestRunner.consumeStream(outputDescriptor, timeout) .entrySet() .stream() .flatMap(entry -> entry.getValue().stream()) @@ -55,19 +57,20 @@ public class StreamAssert { } /** - * Util to assert presence of messages in a stream with multiple partition in any order + * Verifies that the {@code expected} messages are present in any order in the multi partition stream + * represented by {@code outputDescriptor} * - * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map - * @param expected represents a map of partitionId as key and list of messages in stream as value + * @param expected map of partitionId as key and list of messages in stream as value + * @param outputDescriptor describes the stream which will be consumed to compare against expected partition map * @param timeout maximum time to wait for consuming the stream - * @param <M> represents the type of Message in the stream + * @param <StreamMessageType> type of messages in the stream * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages * */ - public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected, - Duration timeout) throws InterruptedException { - Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); - Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout); + public static <StreamMessageType> void containsInAnyOrder(Map<Integer, List<StreamMessageType>> expected, + InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException { + Preconditions.checkNotNull(outputDescriptor); + Map<Integer, List<StreamMessageType>> actual = TestRunner.consumeStream(outputDescriptor, timeout); for (Integer paritionId : expected.keySet()) { assertThat(actual.get(paritionId), IsIterableContainingInAnyOrder.containsInAnyOrder(expected.get(paritionId).toArray())); @@ -75,18 +78,19 @@ public class StreamAssert { } /** - * Util to assert ordering of messages in a stream with single partition + * Verifies that the {@code expected} messages are present in order in the single partition stream + * represented by {@code outputDescriptor} * - * @param collectionStream represents the actual stream which will be consumed to compare against expected list - * @param expected represents the expected stream of messages + * @param expected expected stream of messages + * @param outputDescriptor describes the stream which will be consumed to compare against expected list * @param timeout maximum time to wait for consuming the stream - * @param <M> represents the type of Message in the stream + * @param <StreamMessageType> type of messages in the stream * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages */ - public static <M> void containsInOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout) - throws InterruptedException { - Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); - assertThat(TestRunner.consumeStream(collectionStream, timeout) + public static <StreamMessageType> void containsInOrder(List<StreamMessageType> expected, + InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException { + Preconditions.checkNotNull(outputDescriptor); + assertThat(TestRunner.consumeStream(outputDescriptor, timeout) .entrySet() .stream() .flatMap(entry -> entry.getValue().stream()) @@ -94,18 +98,19 @@ public class StreamAssert { } /** - * Util to assert ordering of messages in a multi-partitioned stream + * Verifies that the {@code expected} messages are present in order in the multi partition stream + * represented by {@code outputDescriptor} * - * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map - * @param expected represents a map of partitionId as key and list of messages as value + * @param expected map of partitionId as key and list of messages as value + * @param outputDescriptor describes the stream which will be consumed to compare against expected partition map * @param timeout maximum time to wait for consuming the stream - * @param <M> represents the type of Message in the stream + * @param <StreamMessageType> type of messages in the stream * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages */ - public static <M> void containsInOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected, - Duration timeout) throws InterruptedException { - Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream"); - Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout); + public static <StreamMessageType> void containsInOrder(Map<Integer, List<StreamMessageType>> expected, + InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException { + Preconditions.checkNotNull(outputDescriptor); + Map<Integer, List<StreamMessageType>> actual = TestRunner.consumeStream(outputDescriptor, timeout); for (Integer paritionId : expected.keySet()) { assertThat(actual.get(paritionId), IsIterableContainingInOrder.contains(expected.get(paritionId).toArray())); } http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 033bcdf..5c4ba3b 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -35,7 +35,6 @@ import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.TaskApplication; import org.apache.samza.config.Config; -import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; @@ -62,58 +61,47 @@ import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTask; import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.task.TaskFactory; -import org.apache.samza.test.framework.stream.CollectionStream; -import org.apache.samza.test.framework.system.CollectionStreamSystemSpec; +import org.apache.samza.test.framework.system.InMemoryInputDescriptor; +import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; +import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.junit.Assert; /** - * TestRunner provides apis to quickly set up tests for Samza low level and high level apis. Default running mode - * for test is Single container without any distributed coordination service. Test runner maintains global job config - * {@code configs} that are used to run the Samza job + * TestRunner provides APIs to set up integration tests for a Samza application. + * Running mode for test is Single container mode + * Test sets following configuration for the application * - * For single container mode following configs are set by default + * The following configs are set by default * <ol> * <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li> * <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li> * <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li> * <li>"job.name" = "test-samza"</li> * <li>"processor.id" = "1"</li> - * <li>"inmemory.scope = " Scope id generated to isolate the run for InMemorySystem</li> * </ol> * */ public class TestRunner { - - private static final String JOB_NAME = "test-samza"; - public enum Mode { - SINGLE_CONTAINER, MULTI_CONTAINER - } + public static final String JOB_NAME = "samza-test"; private Map<String, String> configs; - private Map<String, CollectionStreamSystemSpec> systems; private Class taskClass; private StreamApplication app; - private String testId; - private SystemFactory factory; - - /** - * Mode defines single or multi container running configuration, by default a single container configuration is assumed + /* + * inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor} + * provides an isolated state to run with in memory system */ - private Mode mode; + private String inMemoryScope; private TestRunner() { - this.testId = RandomStringUtils.random(10, true, true); - this.systems = new HashMap<String, CollectionStreamSystemSpec>(); this.configs = new HashMap<>(); - this.mode = Mode.SINGLE_CONTAINER; - this.factory = new InMemorySystemFactory(); - configs.put(InMemorySystemConfig.INMEMORY_SCOPE, testId); + this.inMemoryScope = RandomStringUtils.random(10, true, true); configs.put(JobConfig.JOB_NAME(), JOB_NAME); - configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1"); - configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); - configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); - configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); } /** @@ -129,7 +117,7 @@ public class TestRunner { /** * Constructs a new {@link TestRunner} from following components - * @param app represent a class containing Samza job logic implementing {@link StreamApplication} + * @param app samza job implementing {@link StreamApplication} */ private TestRunner(StreamApplication app) { this(); @@ -138,20 +126,9 @@ public class TestRunner { } /** - * Registers a system with TestRunner if not already registered and configures all the system configs to global - * job configs - */ - private void registerSystem(String systemName) { - if (!systems.containsKey(systemName)) { - systems.put(systemName, CollectionStreamSystemSpec.create(systemName, JOB_NAME)); - configs.putAll(systems.get(systemName).getSystemConfigs()); - } - } - - /** * Creates an instance of {@link TestRunner} for Low Level Samza Api - * @param taskClass represent a class extending either {@link StreamTask} or {@link AsyncStreamTask} - * @return a {@link TestRunner} for {@code taskClass} + * @param taskClass samza job extending either {@link StreamTask} or {@link AsyncStreamTask} + * @return this {@link TestRunner} */ public static TestRunner of(Class taskClass) { Preconditions.checkNotNull(taskClass); @@ -162,8 +139,8 @@ public class TestRunner { /** * Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api - * @param app represent a class representing Samza job by implementing {@link StreamApplication} - * @return a {@link TestRunner} for {@code app} + * @param app samza job implementing {@link StreamApplication} + * @return this {@link TestRunner} */ public static TestRunner of(StreamApplication app) { Preconditions.checkNotNull(app); @@ -171,11 +148,11 @@ public class TestRunner { } /** - * Only adds a config from {@code config} to global {@code configs} if they dont exist in it. - * @param config represents the {@link Config} supposed to be added to global configs - * @return calling instance of {@link TestRunner} with added configs if they don't exist + * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. + * @param config configs for the application + * @return this {@link TestRunner} */ - public TestRunner addConfigs(Config config) { + public TestRunner addConfigs(Map<String, String> config) { Preconditions.checkNotNull(config); config.forEach(this.configs::putIfAbsent); return this; @@ -186,7 +163,7 @@ public class TestRunner { * exisiting in {@code configs} * @param key key of the config * @param value value of the config - * @return calling instance of {@link TestRunner} with added config + * @return this {@link TestRunner} */ public TestRunner addOverrideConfig(String key, String value) { Preconditions.checkNotNull(key); @@ -197,94 +174,72 @@ public class TestRunner { } /** - * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs. - * <p> - * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with - * {@link TestRunner} if not registered already. Then it creates and initializes the stream partitions with messages for - * the registered System - * <p> - * @param stream represents the stream that is supposed to be configured with {@link TestRunner} - * @return calling instance of {@link TestRunner} with {@code stream} configured with it + * Adds the provided input stream with mock data to the test application. + * + * @param descriptor describes the stream that is supposed to be input to Samza application + * @param messages messages used to initialize the single partition stream + * @param <StreamMessageType> a message with null key or a KV {@link org.apache.samza.operators.KV}. + * key of KV represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or + * {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message + * @return this {@link TestRunner} */ - public TestRunner addInputStream(CollectionStream stream) { - Preconditions.checkNotNull(stream); - registerSystem(stream.getSystemName()); - initializeInput(stream); - stream.setTestId(testId); - if (configs.containsKey(TaskConfig.INPUT_STREAMS())) { - configs.put(TaskConfig.INPUT_STREAMS(), - configs.get(TaskConfig.INPUT_STREAMS()).concat("," + stream.getSystemName() + "." + stream.getPhysicalName())); - } else { - configs.put(TaskConfig.INPUT_STREAMS(), stream.getSystemName() + "." + stream.getPhysicalName()); - } - stream.getStreamConfig().forEach((key, val) -> { - configs.putIfAbsent((String) key, (String) val); - }); - + public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor, + List<StreamMessageType> messages) { + Preconditions.checkNotNull(descriptor, messages); + Map<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>(); + partitionData.put(0, messages); + initializeInMemoryInputStream(descriptor, partitionData); return this; } /** - * Creates an in memory stream with {@link InMemorySystemFactory} and initializes the metadata for the stream. - * Initializes each partition of that stream with messages from {@code stream.getInitPartitions} - * - * @param stream represents the stream to initialize with the in memory system - * @param <T> can represent a message or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a - * {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope} - * and value represents the message + * Adds the provided input stream with mock data to the test application. + * @param descriptor describes the stream that is supposed to be input to Samza application + * @param messages map whose key is partitionId and value is messages in the partition + * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}. + * A key of which represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or + * {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message + * @return this {@link TestRunner} */ - private <T> void initializeInput(CollectionStream stream) { - Preconditions.checkNotNull(stream); - Preconditions.checkState(stream.getInitPartitions().size() >= 1); - String streamName = stream.getStreamName(); - String systemName = stream.getSystemName(); - Map<Integer, Iterable<T>> partitions = stream.getInitPartitions(); - StreamSpec spec = new StreamSpec(streamName, stream.getPhysicalName(), systemName, partitions.size()); - factory.getAdmin(systemName, new MapConfig(configs)).createStream(spec); - SystemProducer producer = factory.getProducer(systemName, new MapConfig(configs), null); - partitions.forEach((partitionId, partition) -> { - partition.forEach(e -> { - Object key = e instanceof KV ? ((KV) e).getKey() : null; - Object value = e instanceof KV ? ((KV) e).getValue() : e; - producer.send(systemName, - new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), key, - value)); - }); - producer.send(systemName, - new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), null, - new EndOfStreamMessage(null))); - }); + public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor, + Map<Integer, ? extends Iterable<StreamMessageType>> messages) { + Preconditions.checkNotNull(descriptor, messages); + Map<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>(); + partitionData.putAll(messages); + initializeInMemoryInputStream(descriptor, partitionData); + return this; } /** - * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs. - * <p> - * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with - * {@link TestRunner} if not registered already. Then it creates the stream partitions with the registered System - * <p> - * @param stream represents the stream that is supposed to be configured with {@link TestRunner} - * @return calling instance of {@link TestRunner} with {@code stream} configured with it + * Adds the provided output stream to the test application. + * @param streamDescriptor describes the stream that is supposed to be output for the Samza application + * @param partitionCount partition count of output stream + * @return this {@link TestRunner} */ - public TestRunner addOutputStream(CollectionStream stream) { - Preconditions.checkNotNull(stream); - Preconditions.checkState(stream.getInitPartitions().size() >= 1); - registerSystem(stream.getSystemName()); - stream.setTestId(testId); - StreamSpec spec = new StreamSpec(stream.getStreamName(), stream.getPhysicalName(), stream.getSystemName(), stream.getInitPartitions().size()); + public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) { + Preconditions.checkNotNull(streamDescriptor); + Preconditions.checkState(partitionCount >= 1); + InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor(); + imsd.withInMemoryScope(this.inMemoryScope); + Config config = new MapConfig(streamDescriptor.toConfig(), streamDescriptor.getSystemDescriptor().toConfig()); + InMemorySystemFactory factory = new InMemorySystemFactory(); + String physicalName = (String) streamDescriptor.getPhysicalName().orElse(streamDescriptor.getStreamId()); + StreamSpec spec = new StreamSpec(streamDescriptor.getStreamId(), physicalName, streamDescriptor.getSystemName(), + partitionCount); factory - .getAdmin(stream.getSystemName(), new MapConfig(configs)) + .getAdmin(streamDescriptor.getSystemName(), config) .createStream(spec); - configs.putAll(stream.getStreamConfig()); + addConfigs(streamDescriptor.toConfig()); + addConfigs(streamDescriptor.getSystemDescriptor().toConfig()); return this; } - /** - * Utility to run a test configured using TestRunner + * Run the application with the specified timeout * - * @param timeout time to wait for the high level application or low level task to finish. This timeout does not include + * @param timeout time to wait for the application to finish. This timeout does not include * input stream initialization time or the assertion time over output streams. This timeout just accounts - * for time that samza job takes run. Samza job won't be invoked with negative or zero timeout + * for time that samza job takes run. Timeout must be greater than 0. * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode */ public void run(Duration timeout) { @@ -301,34 +256,33 @@ public class TestRunner { throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable())); } } + /** - * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the - * TestRunner in order to assert over the streams (ex output streams). + * Gets the contents of the output stream represented by {@code outputDescriptor} after {@link TestRunner#run(Duration)} + * has completed * - * @param stream represents {@link CollectionStream} whose current state of partitions is requested to be fetched - * @param timeout poll timeout in Ms - * @param <T> represents type of message + * @param outputDescriptor describes the stream to be consumed + * @param timeout timeout for consumption of stream in Ms + * @param <StreamMessageType> type of message * - * @return a map key of which represents the {@code partitionId} and value represents the current state of the partition - * i.e messages in the partition - * @throws InterruptedException Thrown when a blocking poll has been interrupted by another thread. + * @return a map whose key is {@code partitionId} and value is messages in partition + * @throws SamzaException Thrown when a poll is incomplete */ - public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Duration timeout) throws InterruptedException { - Preconditions.checkNotNull(stream); - Preconditions.checkNotNull(stream.getSystemName()); - String streamName = stream.getStreamName(); - String systemName = stream.getSystemName(); + public static <StreamMessageType> Map<Integer, List<StreamMessageType>> consumeStream( + InMemoryOutputDescriptor outputDescriptor, Duration timeout) throws SamzaException { + Preconditions.checkNotNull(outputDescriptor); + String streamId = outputDescriptor.getStreamId(); + String systemName = outputDescriptor.getSystemName(); Set<SystemStreamPartition> ssps = new HashSet<>(); - Set<String> streamNames = new HashSet<>(); - streamNames.add(streamName); + Set<String> streamIds = new HashSet<>(); + streamIds.add(streamId); SystemFactory factory = new InMemorySystemFactory(); - HashMap<String, String> config = new HashMap<>(); - config.put(InMemorySystemConfig.INMEMORY_SCOPE, stream.getTestId()); - Map<String, SystemStreamMetadata> metadata = - factory.getAdmin(systemName, new MapConfig(config)).getSystemStreamMetadata(streamNames); - SystemConsumer consumer = factory.getConsumer(systemName, new MapConfig(config), null); - metadata.get(stream.getPhysicalName()).getSystemStreamPartitionMetadata().keySet().forEach(partition -> { - SystemStreamPartition temp = new SystemStreamPartition(systemName, streamName, partition); + Config config = new MapConfig(outputDescriptor.toConfig(), outputDescriptor.getSystemDescriptor().toConfig()); + Map<String, SystemStreamMetadata> metadata = factory.getAdmin(systemName, config).getSystemStreamMetadata(streamIds); + SystemConsumer consumer = factory.getConsumer(systemName, config, null); + String name = (String) outputDescriptor.getPhysicalName().orElse(streamId); + metadata.get(name).getSystemStreamPartitionMetadata().keySet().forEach(partition -> { + SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition); ssps.add(temp); consumer.register(temp, "0"); }); @@ -337,12 +291,17 @@ public class TestRunner { Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>(); HashSet<SystemStreamPartition> didNotReachEndOfStream = new HashSet<>(ssps); while (System.currentTimeMillis() < t + timeout.toMillis()) { - Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = consumer.poll(ssps, 10); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = null; + try { + currentState = consumer.poll(ssps, 10); + } catch (InterruptedException e) { + throw new SamzaException("Timed out while consuming stream \n" + e.getMessage()); + } for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry : currentState.entrySet()) { SystemStreamPartition ssp = entry.getKey(); output.computeIfAbsent(ssp, k -> new LinkedList<IncomingMessageEnvelope>()); List<IncomingMessageEnvelope> currentBuffer = entry.getValue(); - Integer totalMessagesToFetch = Integer.valueOf(metadata.get(stream.getStreamName()) + Integer totalMessagesToFetch = Integer.valueOf(metadata.get(outputDescriptor.getStreamId()) .getSystemStreamPartitionMetadata() .get(ssp.getPartition()) .getNewestOffset()); @@ -364,7 +323,7 @@ public class TestRunner { return output.entrySet() .stream() .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(), - entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList()))); + entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList()))); } private TaskFactory createTaskFactory() { @@ -388,4 +347,41 @@ public class TestRunner { throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask " + "or AsyncStreamTask", taskClass.getName())); } + + /** + * Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages + * @param partitonData key of the map represents partitionId and value represents + * messages in the partition + * @param descriptor describes a stream to initialize with the in memory system + */ + private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor, + Map<Integer, Iterable<StreamMessageType>> partitonData) { + String systemName = descriptor.getSystemName(); + String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId()); + if (configs.containsKey(TaskConfig.INPUT_STREAMS())) { + configs.put(TaskConfig.INPUT_STREAMS(), + configs.get(TaskConfig.INPUT_STREAMS()).concat("," + systemName + "." + streamName)); + } else { + configs.put(TaskConfig.INPUT_STREAMS(), systemName + "." + streamName); + } + InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); + imsd.withInMemoryScope(this.inMemoryScope); + addConfigs(descriptor.toConfig()); + addConfigs(descriptor.getSystemDescriptor().toConfig()); + StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size()); + SystemFactory factory = new InMemorySystemFactory(); + Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); + factory.getAdmin(systemName, config).createStream(spec); + SystemProducer producer = factory.getProducer(systemName, config, null); + SystemStream sysStream = new SystemStream(systemName, streamName); + partitonData.forEach((partitionId, partition) -> { + partition.forEach(e -> { + Object key = e instanceof KV ? ((KV) e).getKey() : null; + Object value = e instanceof KV ? ((KV) e).getValue() : e; + producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value)); + }); + producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null, + new EndOfStreamMessage(null))); + }); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java deleted file mode 100644 index 320a0ac..0000000 --- a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.framework.stream; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -/** - * A CollectionStream represents an in memory stream of messages that can either have single or multiple partitions. - * Every CollectionStream is coupled with a {@link org.apache.samza.test.framework.system.CollectionStreamSystemSpec} that - * contains all the specification for system - *<p> - * When sending messages using {@code CollectionStream<KV<K, V>>}, messages use K as key and V as message - * When sending messages using {@code CollectionStream<T>}, messages use a nullkey. - *</p> - * @param <T> - * can represent a message with null key or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a - * {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope} - * and value represents the message of the same - */ -public class CollectionStream<T> { - private String testId; - private final String streamName; - private final String physicalName; - private final String systemName; - private Map<Integer, Iterable<T>> initPartitions; - private Map<String, String> streamConfig; - private static final String STREAM_TO_SYSTEM = "streams.%s.samza.system"; - private static final String PHYSICAL_NAME = "streams.%s.samza.physical.name"; - - /** - * Constructs a new CollectionStream from specified components. - * @param systemName represents name of the system stream is associated with - * @param streamName represents name of the stream - */ - private CollectionStream(String systemName, String streamName) { - Preconditions.checkNotNull(systemName); - Preconditions.checkNotNull(streamName); - this.systemName = systemName; - this.streamName = streamName; - this.streamConfig = new HashMap<>(); - // TODO: Once SAMZA-1737 is resolved, generate a randomized physical name - this.physicalName = streamName; - streamConfig.put(String.format(STREAM_TO_SYSTEM, this.streamName), systemName); - streamConfig.put(String.format(PHYSICAL_NAME, this.streamName), physicalName); - } - - - /** - * Constructs a new CollectionStream with multiple empty partitions from specified components. - * @param systemName represents name of the system stream is associated with - * @param streamName represents name of the stream - * @param partitionCount represents number of partitions, each of these partitions will be empty - */ - private CollectionStream(String systemName, String streamName, Integer partitionCount) { - this(systemName, streamName); - Preconditions.checkState(partitionCount > 0); - initPartitions = new HashMap<>(); - for (int i = 0; i < partitionCount; i++) { - initPartitions.put(i, new ArrayList<>()); - } - } - - /** - * Constructs a new CollectionStream with single partition from specified components. - * @param systemName represents name of the system stream is associated with - * @param streamName represents name of the stream - * @param initPartition represents the messages that the stream will be intialized with, default partitionId for the - * this single partition stream is 0 - */ - private CollectionStream(String systemName, String streamName, Iterable<T> initPartition) { - this(systemName, streamName); - Preconditions.checkNotNull(initPartition); - initPartitions = new HashMap<>(); - initPartitions.put(0, initPartition); - } - - /** - * Constructs a new CollectionStream with multiple partitions from specified components. - * @param systemName represents name of the system stream is associated with - * @param streamName represents name of the stream - * @param initPartitions represents the partition state, key of the map represents partitionId and value represents - * the messages that partition will be initialized with - */ - private CollectionStream(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> initPartitions) { - this(systemName, streamName); - Preconditions.checkNotNull(initPartitions); - this.initPartitions = new HashMap<>(initPartitions); - } - - /** - * @return The Map of partitions that input stream is supposed to be initialized with, this method is - * used internally and should not be used for asserting over streams. - * The true state of stream is determined by {@code consmeStream()} of {@link org.apache.samza.test.framework.TestRunner} - */ - public Map<Integer, Iterable<T>> getInitPartitions() { - return initPartitions; - } - - public String getStreamName() { - return streamName; - } - - public String getSystemName() { - return systemName; - } - - public Map<String, String> getStreamConfig() { - return streamConfig; - } - - public String getTestId() { - return testId; - } - - public void setTestId(String testId) { - this.testId = testId; - } - - public String getPhysicalName() { - return physicalName; - } - - /** - * Creates an in memory stream with the name {@code streamName} and initializes the stream to only one partition - * - * @param systemName represents name of the system stream is associated with - * @param streamName represents the name of the Stream - * @param <T> represents the type of each message in a stream - * @return an {@link CollectionStream} with only one partition that can contain messages of the type - */ - public static <T> CollectionStream<T> empty(String systemName, String streamName) { - return new CollectionStream<>(systemName, streamName, 1); - } - - /** - * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions - * as specified by {@code partitionCount}. These partitions are empty and are supposed to be used by Samza job to produce - * messages to. - * - * @param systemName represents name of the system stream is associated with - * @param streamName represents the name of the Stream - * @param partitionCount represents the number of partitions the stream would have - * @param <T> represents the type of each message in a stream - * @return an empty {@link CollectionStream} with multiple partitions that can contain messages of the type {@code T} - */ - public static <T> CollectionStream<T> empty(String systemName, String streamName, int partitionCount) { - return new CollectionStream<>(systemName, streamName, partitionCount); - } - - /** - * Creates an in memory stream with the name {@code streamName}. Stream is created with single partition having - * {@code partitionId} is 0. This partition is intialzied with messages of type T - * - * @param systemName represents name of the system stream is associated with - * @param streamName represents the name of the Stream - * @param partition represents the messages that the {@link org.apache.samza.system.SystemStreamPartition} will be - * initialized with - * @param <T> represents the type of a message in the stream - * @return a {@link CollectionStream} with only one partition containing messages of the type {@code T} - * - */ - public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> partition) { - return new CollectionStream<>(systemName, streamName, partition); - } - - /** - * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions - * as the size of {@code partitions} map. Key of the map {@code partitions} represents the {@code partitionId} of - * each {@link org.apache.samza.Partition} for a {@link org.apache.samza.system.SystemStreamPartition} and value is - * an Iterable of messages that the {@link org.apache.samza.system.SystemStreamPartition} should be initialized with. - * - * @param systemName represents name of the system stream is associated with - * @param streamName represents the name of the Stream - * @param partitions Key of an entry in partitions represents a {@code partitionId} of a {@link org.apache.samza.Partition} - * and value represents the stream of messages the {@link org.apache.samza.system.SystemStreamPartition} - * will be initialized with - * @param <T> represents the type of a message in the stream - * @return a {@link CollectionStream} with multiple partitions each containing messages of the type {@code T} - * - */ - public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> partitions) { - return new CollectionStream<>(systemName, streamName, partitions); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java deleted file mode 100644 index 5658f61..0000000 --- a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.framework.system; - -import com.google.common.base.Preconditions; -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.apache.samza.system.inmemory.InMemorySystemFactory; - - -/** - * CollectionStreamSystem represents a system that interacts with an underlying {@link InMemorySystemFactory} to create - * various input and output streams and initialize {@link org.apache.samza.system.SystemStreamPartition} with messages - * <p> - * Following system level configs are set by default - * <ol> - * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li> - * <li>"jobs.job-name.systems.%s.default.stream.samza.offset.default" = "oldest"</li> - * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> - * <li>"jobs.job-name.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> - * </ol> - * The "systems.*" configs are required since the planner uses the system to get metadata about streams during - * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided - * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*" - * scope have the highest precedence. - */ -public class CollectionStreamSystemSpec { - private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; // prefix to override configs generated by the planner - private static final String SYSTEM_FACTORY = "systems.%s.samza.factory"; - private static final String SYSTEM_OFFSET = "systems.%s.default.stream.samza.offset.default"; - - private String systemName; - private Map<String, String> systemConfigs; - - /** - * Constructs a new CollectionStreamSystem from specified components. - * <p> - * Every {@link CollectionStreamSystemSpec} is assumed to consume from the oldest offset, since stream is in memory and - * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams. - * <p> - * @param systemName represents unique name of the system - */ - private CollectionStreamSystemSpec(String systemName, String jobName) { - this.systemName = systemName; - systemConfigs = new HashMap<String, String>(); - systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), InMemorySystemFactory.class.getName()); - systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_FACTORY, jobName, systemName), InMemorySystemFactory.class.getName()); - systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest"); - systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_OFFSET, jobName, systemName), "oldest"); - } - - public String getSystemName() { - return systemName; - } - - public Map<String, String> getSystemConfigs() { - return systemConfigs; - } - - /** - * Creates a {@link CollectionStreamSystemSpec} with name {@code systemName} - * @param systemName represents name of the {@link CollectionStreamSystemSpec} - * @param jobName name of the job - * @return an instance of {@link CollectionStreamSystemSpec} - */ - public static CollectionStreamSystemSpec create(String systemName, String jobName) { - Preconditions.checkState(StringUtils.isNotBlank(systemName)); - Preconditions.checkState(StringUtils.isNotBlank(jobName)); - return new CollectionStreamSystemSpec(systemName, jobName); - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java new file mode 100644 index 0000000..6065bf0 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework.system; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.serializers.NoOpSerde; + +/** + * A descriptor for an in memory stream of messages that can either have single or multiple partitions. + * <p> + * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}. + * <p> + * @param <StreamMessageType> type of messages in input stream + */ +public class InMemoryInputDescriptor<StreamMessageType> + extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> { + /** + * Constructs a new InMemoryInputDescriptor from specified components. + * @param systemDescriptor name of the system stream is associated with + * @param streamId name of the stream + */ + InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) { + super(streamId, new NoOpSerde<>(), systemDescriptor, null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java new file mode 100644 index 0000000..75fe7ae --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework.system; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.NoOpSerde; + +/** + * A descriptor for an in memory output stream. + * <p> + * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}. + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + */ +public class InMemoryOutputDescriptor<StreamMessageType> + extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> { + + /** + * Constructs an {@link OutputDescriptor} instance. + * @param streamId id of the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + */ + InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) { + super(streamId, new NoOpSerde<>(), systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java new file mode 100644 index 0000000..92b23ef --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework.system; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.InMemorySystemConfig; +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.inmemory.InMemorySystemFactory; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.test.framework.TestRunner; + + +/** + * A descriptor for InMemorySystem. + * System properties configured using a descriptor override corresponding properties provided in configuration. + * <p> + * Following system level configs are set by default + * <ol> + * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li> + * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> + * <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li> + * </ol> + */ +public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor> + implements SimpleInputDescriptorProvider, OutputDescriptorProvider { + private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName(); + /** + * <p> + * The "systems.*" configs are required since the planner uses the system to get metadata about streams during + * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided + * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*" + * scope have the highest precedence. + * + * For this case, it generates following overridden configs + * <ol> + * <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li> + * <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> + * </ol> + * + **/ + private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; + private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default"; + + private String inMemoryScope; + + /** + * Constructs a new InMemorySystemDescriptor from specified components. + * <p> + * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and + * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams. + * <p> + * @param systemName unique name of the system + */ + public InMemorySystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, null, null); + this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST); + } + + @Override + public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor( + String streamId, Serde<StreamMessageType> serde) { + return new InMemoryInputDescriptor<StreamMessageType>(streamId, this); + } + + @Override + public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor( + String streamId, Serde<StreamMessageType> serde) { + return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this); + } + + /** + * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses + * This method is framework use only, users are not supposed to use it + * + * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem + * @return this system descriptor + */ + public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) { + this.inMemoryScope = inMemoryScope; + return this; + } + + @Override + public Map<String, String> toConfig() { + HashMap<String, String> configs = new HashMap<>(super.toConfig()); + configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope); + configs.put(String.format(CONFIG_OVERRIDE_PREFIX + JavaSystemConfig.SYSTEM_FACTORY_FORMAT, TestRunner.JOB_NAME, getSystemName()), + FACTORY_CLASS_NAME); + configs.put( + String.format(CONFIG_OVERRIDE_PREFIX + DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, TestRunner.JOB_NAME, + getSystemName()), SystemStreamMetadata.OffsetType.OLDEST.toString()); + return configs; + } + +} + http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index 3a1eba0..581b1c3 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -27,7 +27,10 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.samza.operators.KV; -import org.apache.samza.test.framework.stream.CollectionStream; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.test.framework.system.InMemoryInputDescriptor; +import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; +import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Assert; import org.junit.Test; @@ -40,16 +43,21 @@ public class AsyncStreamTaskIntegrationTest { List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); - CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList); - CollectionStream output = CollectionStream.empty("async-test", "ints-out"); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test"); + + InMemoryInputDescriptor<Integer> imid = isd + .getInputDescriptor("ints", new NoOpSerde<Integer>()); + + InMemoryOutputDescriptor imod = isd + .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner .of(MyAsyncStreamTask.class) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, inputList) + .addOutputStream(imod, 1) .run(Duration.ofSeconds(2)); - Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0), + Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0), IsIterableContainingInOrder.contains(outputList.toArray())); } @@ -58,49 +66,70 @@ public class AsyncStreamTaskIntegrationTest { List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputList = Arrays.asList(50, 10, 20, 30, 40); - CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList); - CollectionStream output = CollectionStream.empty("async-test", "ints-out"); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test"); + + InMemoryInputDescriptor<Integer> imid = isd + .getInputDescriptor("ints", new NoOpSerde<Integer>()); + + InMemoryOutputDescriptor imod = isd + .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner .of(MyAsyncStreamTask.class) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, inputList) + .addOutputStream(imod, 1) .run(Duration.ofSeconds(2)); - StreamAssert.containsInAnyOrder(output, outputList, Duration.ofMillis(1000)); + StreamAssert.containsInAnyOrder(outputList, imod, Duration.ofMillis(1000)); } @Test public void testAsyncTaskWithMultiplePartition() throws Exception { Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); - List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); - List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); - for (int i = 0; i < 5; i++) { - List<KV> keyedPartition = new ArrayList<>(); - for (Integer val : partition) { - keyedPartition.add(KV.of(i, val)); - } - inputPartitionData.put(i, keyedPartition); - expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); - } + genData(inputPartitionData, expectedOutputPartitionData); - CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData); - CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test"); + + InMemoryInputDescriptor<KV> imid = isd + .getInputDescriptor("ints", new NoOpSerde<KV>()); + InMemoryOutputDescriptor imod = isd + .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner .of(MyAsyncStreamTask.class) - .addInputStream(inputStream) - .addOutputStream(outputStream) + .addInputStream(imid, inputPartitionData) + .addOutputStream(imod, 5) .run(Duration.ofSeconds(2)); - StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); + StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); } @Test public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception { Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); + genData(inputPartitionData, expectedOutputPartitionData); + + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test"); + + InMemoryInputDescriptor<KV> imid = isd + .getInputDescriptor("ints", new NoOpSerde<>()); + + InMemoryOutputDescriptor imod = isd + .getOutputDescriptor("ints-out", new NoOpSerde<>()); + + TestRunner + .of(MyAsyncStreamTask.class) + .addInputStream(imid, inputPartitionData) + .addOutputStream(imod, 5) + .addOverrideConfig("task.max.concurrency", "4") + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); + } + + public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) { List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); for (int i = 0; i < 5; i++) { @@ -111,18 +140,6 @@ public class AsyncStreamTaskIntegrationTest { inputPartitionData.put(i, keyedPartition); expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); } - - CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData); - CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5); - - TestRunner - .of(MyAsyncStreamTask.class) - .addInputStream(inputStream) - .addOutputStream(outputStream) - .addOverrideConfig("task.max.concurrency", "4") - .run(Duration.ofSeconds(2)); - - StreamAssert.containsInAnyOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); } /** @@ -130,15 +147,20 @@ public class AsyncStreamTaskIntegrationTest { */ @Test(expected = AssertionError.class) public void testSamzaJobTimeoutFailureForAsyncTask() { - List<Integer> inputList = Arrays.asList(1, 2, 3, 4); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test"); + + InMemoryInputDescriptor<Integer> imid = isd + .getInputDescriptor("ints", new NoOpSerde<>()); - CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList); - CollectionStream output = CollectionStream.empty("async-test", "ints-out"); + InMemoryOutputDescriptor imod = isd + .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner .of(MyAsyncStreamTask.class) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, Arrays.asList(1, 2, 3, 4)) + .addOutputStream(imod, 1) .run(Duration.ofMillis(1)); } + + } http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 1000f22..6dd9159 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -36,7 +36,9 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.controlmessages.TestData; -import org.apache.samza.test.framework.stream.CollectionStream; +import org.apache.samza.test.framework.system.InMemoryInputDescriptor; +import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; +import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.junit.Assert; import org.junit.Test; @@ -82,17 +84,22 @@ public class StreamApplicationIntegrationTest { pageviews.add(pv); } - CollectionStream<PageView> input = CollectionStream.of("test", "PageView", pageviews); - CollectionStream output = CollectionStream.empty("test", "Output", 10); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<PageView> imid = isd + .getInputDescriptor("PageView", new NoOpSerde<PageView>()); + + InMemoryOutputDescriptor<PageView> imod = isd + .getOutputDescriptor("Output", new NoOpSerde<PageView>()); TestRunner .of(pageViewRepartition) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, pageviews) + .addOutputStream(imod, 10) .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1500)); - Assert.assertEquals(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1); + Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1); } public static final class Values { @@ -107,13 +114,18 @@ public class StreamApplicationIntegrationTest { @Test(expected = SamzaException.class) public void testSamzaJobStartMissingConfigFailureForStreamApplication() { - CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", new ArrayList<>()); - CollectionStream output = CollectionStream.empty("test", "Output", 10); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<PageView> imid = isd + .getInputDescriptor("PageView", new NoOpSerde<PageView>()); + + InMemoryOutputDescriptor<PageView> imod = isd + .getOutputDescriptor("Output", new NoOpSerde<PageView>()); TestRunner .of(pageViewRepartition) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, new ArrayList<>()) + .addOutputStream(imod, 10) .run(Duration.ofMillis(1000)); } @@ -131,12 +143,17 @@ public class StreamApplicationIntegrationTest { pageviews.add(new TestData.PageView(null, memberId)); } - CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", pageviews); - CollectionStream output = CollectionStream.empty("test", "Output", 1); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<PageView> imid = isd + .getInputDescriptor("PageView", new NoOpSerde<PageView>()); + + InMemoryOutputDescriptor<PageView> imod = isd + .getOutputDescriptor("Output", new NoOpSerde<PageView>()); TestRunner.of(pageViewFilter) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, pageviews) + .addOutputStream(imod, 10) .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1000)); } http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index f888b4a..0580598 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -28,7 +28,10 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.operators.KV; -import org.apache.samza.test.framework.stream.CollectionStream; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.test.framework.system.InMemoryInputDescriptor; +import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; +import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Assert; import org.junit.Test; @@ -41,13 +44,23 @@ public class StreamTaskIntegrationTest { List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); - CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList); - CollectionStream output = CollectionStream.empty("test", "output"); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); - TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1)); + InMemoryInputDescriptor<Integer> imid = isd + .getInputDescriptor("input", new NoOpSerde<Integer>()); - Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0), + InMemoryOutputDescriptor<Integer> imod = isd + .getOutputDescriptor("output", new NoOpSerde<Integer>()); + + TestRunner + .of(MyStreamTestTask.class) + .addInputStream(imid, inputList) + .addOutputStream(imod, 1) + .run(Duration.ofSeconds(1)); + + Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0), IsIterableContainingInOrder.contains(outputList.toArray())); + } /** @@ -57,10 +70,19 @@ public class StreamTaskIntegrationTest { public void testSamzaJobFailureForSyncTask() { List<Double> inputList = Arrays.asList(1.2, 2.3, 3.33, 4.5); - CollectionStream<Double> input = CollectionStream.of("test", "doubles", inputList); - CollectionStream output = CollectionStream.empty("test", "output"); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); - TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1)); + InMemoryInputDescriptor<Double> imid = isd + .getInputDescriptor("doubles", new NoOpSerde<Double>()); + + InMemoryOutputDescriptor imod = isd + .getOutputDescriptor("output", new NoOpSerde<>()); + + TestRunner + .of(MyStreamTestTask.class) + .addInputStream(imid, inputList) + .addOutputStream(imod, 1) + .run(Duration.ofSeconds(1)); } @Test @@ -68,50 +90,72 @@ public class StreamTaskIntegrationTest { List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); - CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList); - CollectionStream output = CollectionStream.empty("test", "output"); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<Integer> imid = isd + .getInputDescriptor("input", new NoOpSerde<Integer>()); + + InMemoryOutputDescriptor<Integer> imod = isd + .getOutputDescriptor("output", new NoOpSerde<Integer>()); TestRunner .of(MyStreamTestTask.class) - .addInputStream(input) - .addOutputStream(output) + .addInputStream(imid, inputList) + .addOutputStream(imod, 1) .addOverrideConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(1)); - StreamAssert.containsInOrder(output, outputList, Duration.ofMillis(1000)); + StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000)); } @Test public void testSyncTaskWithMultiplePartition() throws Exception { Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); - List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); - List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); - for (int i = 0; i < 5; i++) { - List<KV> keyedPartition = new ArrayList<>(); - for (Integer val : partition) { - keyedPartition.add(KV.of(i, val)); - } - inputPartitionData.put(i, keyedPartition); - expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); - } + genData(inputPartitionData, expectedOutputPartitionData); + + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<KV> imid = isd + .getInputDescriptor("input", new NoOpSerde<KV>()); - CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData); - CollectionStream outputStream = CollectionStream.empty("test", "output", 5); + InMemoryOutputDescriptor<Integer> imod = isd + .getOutputDescriptor("output", new NoOpSerde<Integer>()); TestRunner .of(MyStreamTestTask.class) - .addInputStream(inputStream) - .addOutputStream(outputStream) + .addInputStream(imid, inputPartitionData) + .addOutputStream(imod, 5) .run(Duration.ofSeconds(2)); - StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); + StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); } @Test public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception { Map<Integer, List<KV>> inputPartitionData = new HashMap<>(); Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>(); + genData(inputPartitionData, expectedOutputPartitionData); + + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<KV> imid = isd + .getInputDescriptor("input", new NoOpSerde<KV>()); + + InMemoryOutputDescriptor<Integer> imod = isd + .getOutputDescriptor("output", new NoOpSerde<Integer>()); + + TestRunner + .of(MyStreamTestTask.class) + .addInputStream(imid, inputPartitionData) + .addOutputStream(imod, 5) + .addOverrideConfig("job.container.thread.pool.size", "4") + .run(Duration.ofSeconds(2)); + + StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); + } + + public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) { List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); for (int i = 0; i < 5; i++) { @@ -122,17 +166,5 @@ public class StreamTaskIntegrationTest { inputPartitionData.put(i, keyedPartition); expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); } - - CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData); - CollectionStream outputStream = CollectionStream.empty("test", "output", 5); - - TestRunner - .of(MyStreamTestTask.class) - .addInputStream(inputStream) - .addOutputStream(outputStream) - .addOverrideConfig("job.container.thread.pool.size", "4") - .run(Duration.ofSeconds(2)); - - StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 5c067ad..adcea48 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.JobConfig; @@ -44,7 +45,9 @@ import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.table.Table; import org.apache.samza.test.framework.TestRunner; -import org.apache.samza.test.framework.stream.CollectionStream; +import org.apache.samza.test.framework.system.InMemoryInputDescriptor; +import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; +import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.junit.Test; @@ -84,25 +87,28 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName); - CollectionStream<PageView> pageViewStream = - CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews); - CollectionStream<Profile> profileStream = - CollectionStream.of(systemName, PROFILE_STREAM, profiles); + InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName); - CollectionStream<EnrichedPageView> outputStream = - CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM); + InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd + .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>()); + + InMemoryInputDescriptor<Profile> profileStreamDesc = isd + .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>()); + + InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd + .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>()); TestRunner .of(app) - .addInputStream(pageViewStream) - .addInputStream(profileStream) - .addOutputStream(outputStream) + .addInputStream(pageViewStreamDesc, pageViews) + .addInputStream(profileStreamDesc, profiles) + .addOutputStream(outputStreamDesc, 1) .addConfigs(new MapConfig(configs)) .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()) .run(Duration.ofMillis(100000)); try { - Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000)); + Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000)); List<EnrichedPageView> results = result.values().stream() .flatMap(List::stream) .collect(Collectors.toList()); @@ -117,7 +123,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness assertEquals("Mismatch between the expected and actual join count", results.size(), expectedEnrichedPageviews.size()); assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin); - } catch (InterruptedException e) { + } catch (SamzaException e) { e.printStackTrace(); } } @@ -163,4 +169,4 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness }); } } -} +} \ No newline at end of file
