Repository: samza Updated Branches: refs/heads/master 991ed99aa -> 15c5a1a7e
SAMZA-1648: Integration Test Framework & Collection Stream Impl This patch provides the following: - TestRunner: Tesing Wrapper to run Samza job - CollectionStream: Acts as a stream descriptor for in memory collections - CollectionStreamSystem: System associated with a Collection - StreamUtils: Utilities over streams - Sample example of tests Link to SEP: https://cwiki.apache.org/confluence/display/SAMZA/SEP-12%3A+Integration+Test+Framework Author: sanil15 <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #501 from Sanil15/SAMZA-1648 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/15c5a1a7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/15c5a1a7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/15c5a1a7 Branch: refs/heads/master Commit: 15c5a1a7e3935bd1fc9808e81bb98ec9c98b14a2 Parents: 991ed99 Author: sanil15 <[email protected]> Authored: Fri Jun 22 15:19:50 2018 -0700 Committer: xiliu <[email protected]> Committed: Fri Jun 22 15:19:50 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/test/framework/TestRunner.java | 351 +++++++++++++++++++ .../test/framework/stream/CollectionStream.java | 204 +++++++++++ .../system/CollectionStreamSystemSpec.java | 79 +++++ .../AsyncStreamTaskIntegrationTest.java | 49 +++ .../samza/test/framework/MyAsyncStreamTask.java | 66 ++++ .../samza/test/framework/MyStreamTestTask.java | 37 ++ .../StreamApplicationIntegrationTest.java | 79 +++++ .../framework/StreamTaskIntegrationTest.java | 45 +++ 8 files changed, 910 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java new file mode 100644 index 0000000..dee10c6 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.InMemorySystemConfig; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.operators.KV; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.inmemory.InMemorySystemFactory; +import org.apache.samza.task.AsyncStreamTask; +import org.apache.samza.task.StreamTask; +import org.apache.samza.test.framework.stream.CollectionStream; +import org.apache.samza.test.framework.system.CollectionStreamSystemSpec; + + +/** + * TestRunner provides apis to quickly set up tests for Samza low level and high level apis. Default running mode + * for test is Single container without any distributed coordination service. Test runner maintains global job config + * {@code configs} that are used to run the Samza job + * + * For single container mode following configs are set by default + * <ol> + * <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li> + * <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li> + * <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li> + * <li>"job.name" = "test-samza"</li> + * <li>"processor.id" = "1"</li> + * <li>"inmemory.scope = " Scope id generated to isolate the run for InMemorySystem</li> + * </ol> + * + */ +public class TestRunner { + + private static final String JOB_NAME = "test-samza"; + public enum Mode { + SINGLE_CONTAINER, MULTI_CONTAINER + } + + private Map<String, String> configs; + private Map<String, CollectionStreamSystemSpec> systems; + private Class taskClass; + private StreamApplication app; + private String testId; + private SystemFactory factory; + + /** + * Mode defines single or multi container running configuration, by default a single container configuration is assumed + */ + private Mode mode; + + private TestRunner() { + this.testId = RandomStringUtils.random(10, true, true); + this.systems = new HashMap<String, CollectionStreamSystemSpec>(); + this.configs = new HashMap<>(); + this.mode = Mode.SINGLE_CONTAINER; + this.factory = new InMemorySystemFactory(); + configs.put(InMemorySystemConfig.INMEMORY_SCOPE, testId); + configs.put(JobConfig.JOB_NAME(), JOB_NAME); + configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1"); + configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); + configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + } + + /** + * Constructs a new {@link TestRunner} from following components + * @param taskClass represent a class containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask} + */ + private TestRunner(Class taskClass) { + this(); + Preconditions.checkNotNull(taskClass); + configs.put(TaskConfig.TASK_CLASS(), taskClass.getName()); + this.taskClass = taskClass; + } + + /** + * Constructs a new {@link TestRunner} from following components + * @param app represent a class containing Samza job logic implementing {@link StreamApplication} + */ + private TestRunner(StreamApplication app) { + this(); + Preconditions.checkNotNull(app); + this.app = app; + } + + /** + * Registers a system with TestRunner if not already registered and configures all the system configs to global + * job configs + */ + private void registerSystem(String systemName) { + if (!systems.containsKey(systemName)) { + systems.put(systemName, CollectionStreamSystemSpec.create(systemName)); + configs.putAll(systems.get(systemName).getSystemConfigs()); + } + } + + /** + * Creates an instance of {@link TestRunner} for Low Level Samza Api + * @param taskClass represent a class extending either {@link StreamTask} or {@link AsyncStreamTask} + * @return a {@link TestRunner} for {@code taskClass} + */ + public static TestRunner of(Class taskClass) { + Preconditions.checkNotNull(taskClass); + Preconditions.checkState( + StreamTask.class.isAssignableFrom(taskClass) || AsyncStreamTask.class.isAssignableFrom(taskClass)); + return new TestRunner(taskClass); + } + + /** + * Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api + * @param app represent a class representing Samza job by implementing {@link StreamApplication} + * @return a {@link TestRunner} for {@code app} + */ + public static TestRunner of(StreamApplication app) { + Preconditions.checkNotNull(app); + return new TestRunner(app); + } + + /** + * Only adds a config from {@code config} to global {@code configs} if they dont exist in it. + * @param config represents the {@link Config} supposed to be added to global configs + * @return calling instance of {@link TestRunner} with added configs if they don't exist + */ + public TestRunner addConfigs(Config config) { + Preconditions.checkNotNull(config); + config.forEach(this.configs::putIfAbsent); + return this; + } + + /** + * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already + * exisiting in {@code configs} + * @param key key of the config + * @param value value of the config + * @return calling instance of {@link TestRunner} with added config + */ + public TestRunner addOverrideConfig(String key, String value) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + configs.put(key, value); + return this; + } + + /** + * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs. + * <p> + * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with + * {@link TestRunner} if not registered already. Then it creates and initializes the stream partitions with messages for + * the registered System + * <p> + * @param stream represents the stream that is supposed to be configured with {@link TestRunner} + * @return calling instance of {@link TestRunner} with {@code stream} configured with it + */ + public TestRunner addInputStream(CollectionStream stream) { + Preconditions.checkNotNull(stream); + registerSystem(stream.getSystemName()); + initializeInput(stream); + stream.setTestId(testId); + if (configs.containsKey(TaskConfig.INPUT_STREAMS())) { + configs.put(TaskConfig.INPUT_STREAMS(), + configs.get(TaskConfig.INPUT_STREAMS()).concat("," + stream.getSystemName() + "." + stream.getPhysicalName())); + } else { + configs.put(TaskConfig.INPUT_STREAMS(), stream.getSystemName() + "." + stream.getPhysicalName()); + } + stream.getStreamConfig().forEach((key, val) -> { + configs.putIfAbsent((String) key, (String) val); + }); + + return this; + } + + /** + * Creates an in memory stream with {@link InMemorySystemFactory} and initializes the metadata for the stream. + * Initializes each partition of that stream with messages from {@code stream.getInitPartitions} + * + * @param stream represents the stream to initialize with the in memory system + * @param <T> can represent a message or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a + * {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope} + * and value represents the message + */ + private <T> void initializeInput(CollectionStream stream) { + Preconditions.checkNotNull(stream); + Preconditions.checkState(stream.getInitPartitions().size() >= 1); + String streamName = stream.getStreamName(); + String systemName = stream.getSystemName(); + Map<Integer, Iterable<T>> partitions = stream.getInitPartitions(); + StreamSpec spec = new StreamSpec(streamName, stream.getPhysicalName(), systemName, partitions.size()); + factory.getAdmin(systemName, new MapConfig(configs)).createStream(spec); + SystemProducer producer = factory.getProducer(systemName, new MapConfig(configs), null); + partitions.forEach((partitionId, partition) -> { + partition.forEach(e -> { + Object key = e instanceof KV ? ((KV) e).getKey() : null; + Object value = e instanceof KV ? ((KV) e).getValue() : e; + producer.send(systemName, + new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), key, + value)); + }); + producer.send(systemName, + new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), null, + new EndOfStreamMessage(null))); + }); + } + + /** + * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs. + * <p> + * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with + * {@link TestRunner} if not registered already. Then it creates the stream partitions with the registered System + * <p> + * @param stream represents the stream that is supposed to be configured with {@link TestRunner} + * @return calling instance of {@link TestRunner} with {@code stream} configured with it + */ + public TestRunner addOutputStream(CollectionStream stream) { + Preconditions.checkNotNull(stream); + Preconditions.checkState(stream.getInitPartitions().size() >= 1); + registerSystem(stream.getSystemName()); + stream.setTestId(testId); + StreamSpec spec = new StreamSpec(stream.getStreamName(), stream.getPhysicalName(), stream.getSystemName(), stream.getInitPartitions().size()); + factory + .getAdmin(stream.getSystemName(), new MapConfig(configs)) + .createStream(spec); + configs.putAll(stream.getStreamConfig()); + return this; + } + + /** + * Utility to run a test configured using TestRunner + */ + public void run() { + Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null), + "TestRunner should run for Low Level Task api or High Level Application Api"); + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + if (app == null) { + runner.runTask(); + runner.waitForFinish(); + } else { + runner.run(app); + runner.waitForFinish(); + } + } + + /** + * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the + * TestRunner in order to assert over the streams (ex output streams). + * + * @param stream represents {@link CollectionStream} whose current state of partitions is requested to be fetched + * @param timeout poll timeout in Ms + * @param <T> represents type of message + * + * @return a map key of which represents the {@code partitionId} and value represents the current state of the partition + * i.e messages in the partition + * @throws InterruptedException Thrown when a blocking poll has been interrupted by another thread. + */ + public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Integer timeout) throws InterruptedException { + Preconditions.checkNotNull(stream); + Preconditions.checkNotNull(stream.getSystemName()); + String streamName = stream.getStreamName(); + String systemName = stream.getSystemName(); + Set<SystemStreamPartition> ssps = new HashSet<>(); + Set<String> streamNames = new HashSet<>(); + streamNames.add(streamName); + SystemFactory factory = new InMemorySystemFactory(); + HashMap<String, String> config = new HashMap<>(); + config.put(InMemorySystemConfig.INMEMORY_SCOPE, stream.getTestId()); + Map<String, SystemStreamMetadata> metadata = + factory.getAdmin(systemName, new MapConfig(config)).getSystemStreamMetadata(streamNames); + SystemConsumer consumer = factory.getConsumer(systemName, new MapConfig(config), null); + metadata.get(stream.getPhysicalName()).getSystemStreamPartitionMetadata().keySet().forEach(partition -> { + SystemStreamPartition temp = new SystemStreamPartition(systemName, streamName, partition); + ssps.add(temp); + consumer.register(temp, "0"); + }); + + long t = System.currentTimeMillis(); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>(); + HashSet<SystemStreamPartition> didNotReachEndOfStream = new HashSet<>(ssps); + while (System.currentTimeMillis() < t + timeout) { + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = consumer.poll(ssps, 10); + for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry : currentState.entrySet()) { + SystemStreamPartition ssp = entry.getKey(); + output.computeIfAbsent(ssp, k -> new LinkedList<IncomingMessageEnvelope>()); + List<IncomingMessageEnvelope> currentBuffer = entry.getValue(); + Integer totalMessagesToFetch = Integer.valueOf(metadata.get(stream.getStreamName()) + .getSystemStreamPartitionMetadata() + .get(ssp.getPartition()) + .getNewestOffset()); + if (output.get(ssp).size() + currentBuffer.size() == totalMessagesToFetch) { + didNotReachEndOfStream.remove(entry.getKey()); + ssps.remove(entry.getKey()); + } + output.get(ssp).addAll(currentBuffer); + } + if (didNotReachEndOfStream.isEmpty()) { + break; + } + } + + if (!didNotReachEndOfStream.isEmpty()) { + throw new IllegalStateException("Could not poll for all system stream partitions"); + } + + return output.entrySet() + .stream() + .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(), + entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList()))); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java new file mode 100644 index 0000000..b3d9485 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework.stream; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * A CollectionStream represents an in memory stream of messages that can either have single or multiple partitions. + * Every CollectionStream is coupled with a {@link org.apache.samza.test.framework.system.CollectionStreamSystemSpec} that + * contains all the specification for system + *<p> + * When sending messages using {@code CollectionStream<KV<K, V>>}, messages use K as key and V as message + * When sending messages using {@code CollectionStream<T>}, messages use a nullkey. + *</p> + * @param <T> + * can represent a message with null key or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a + * {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope} + * and value represents the message of the same + */ +public class CollectionStream<T> { + private String testId; + private final String streamName; + private final String physicalName; + private final String systemName; + private Map<Integer, Iterable<T>> initPartitions; + private Map<String, String> streamConfig; + private static final String STREAM_TO_SYSTEM = "streams.%s.samza.system"; + private static final String PHYSICAL_NAME = "streams.%s.samza.physical.name"; + + /** + * Constructs a new CollectionStream from specified components. + * @param systemName represents name of the system stream is associated with + * @param streamName represents name of the stream + */ + private CollectionStream(String systemName, String streamName) { + Preconditions.checkNotNull(systemName); + Preconditions.checkNotNull(streamName); + this.systemName = systemName; + this.streamName = streamName; + this.streamConfig = new HashMap<>(); + // TODO: Once SAMZA-1737 is resolved, generate a randomized physical name + this.physicalName = streamName; + streamConfig.put(String.format(STREAM_TO_SYSTEM, this.streamName), systemName); + streamConfig.put(String.format(PHYSICAL_NAME, this.streamName), physicalName); + } + + + /** + * Constructs a new CollectionStream with multiple empty partitions from specified components. + * @param systemName represents name of the system stream is associated with + * @param streamName represents name of the stream + * @param partitionCount represents number of partitions, each of these partitions will be empty + */ + private CollectionStream(String systemName, String streamName, Integer partitionCount) { + this(systemName, streamName); + Preconditions.checkState(partitionCount > 0); + initPartitions = new HashMap<>(); + for (int i = 0; i < partitionCount; i++) { + initPartitions.put(i, new ArrayList<>()); + } + } + + /** + * Constructs a new CollectionStream with single partition from specified components. + * @param systemName represents name of the system stream is associated with + * @param streamName represents name of the stream + * @param initPartition represents the messages that the stream will be intialized with, default partitionId for the + * this single partition stream is 0 + */ + private CollectionStream(String systemName, String streamName, Iterable<T> initPartition) { + this(systemName, streamName); + Preconditions.checkNotNull(initPartition); + initPartitions = new HashMap<>(); + initPartitions.put(0, initPartition); + } + + /** + * Constructs a new CollectionStream with multiple partitions from specified components. + * @param systemName represents name of the system stream is associated with + * @param streamName represents name of the stream + * @param initPartitions represents the partition state, key of the map represents partitionId and value represents + * the messages that partition will be initialized with + */ + private CollectionStream(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> initPartitions) { + this(systemName, streamName); + Preconditions.checkNotNull(initPartitions); + initPartitions = new HashMap<>(initPartitions); + } + + /** + * @return The Map of partitions that input stream is supposed to be initialized with, this method is + * used internally and should not be used for asserting over streams. + * The true state of stream is determined by {@code consmeStream()} of {@link org.apache.samza.test.framework.TestRunner} + */ + public Map<Integer, Iterable<T>> getInitPartitions() { + return initPartitions; + } + + public String getStreamName() { + return streamName; + } + + public String getSystemName() { + return systemName; + } + + public Map<String, String> getStreamConfig() { + return streamConfig; + } + + public String getTestId() { + return testId; + } + + public void setTestId(String testId) { + this.testId = testId; + } + + public String getPhysicalName() { + return physicalName; + } + + /** + * Creates an in memory stream with the name {@code streamName} and initializes the stream to only one partition + * + * @param systemName represents name of the system stream is associated with + * @param streamName represents the name of the Stream + * @param <T> represents the type of each message in a stream + * @return an {@link CollectionStream} with only one partition that can contain messages of the type + */ + public static <T> CollectionStream<T> empty(String systemName, String streamName) { + return new CollectionStream<>(systemName, streamName, 1); + } + + /** + * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions + * as specified by {@code partitionCount}. These partitions are empty and are supposed to be used by Samza job to produce + * messages to. + * + * @param systemName represents name of the system stream is associated with + * @param streamName represents the name of the Stream + * @param partitionCount represents the number of partitions the stream would have + * @param <T> represents the type of each message in a stream + * @return an empty {@link CollectionStream} with multiple partitions that can contain messages of the type {@code T} + */ + public static <T> CollectionStream<T> empty(String systemName, String streamName, int partitionCount) { + return new CollectionStream<>(systemName, streamName, partitionCount); + } + + /** + * Creates an in memory stream with the name {@code streamName}. Stream is created with single partition having + * {@code partitionId} is 0. This partition is intialzied with messages of type T + * + * @param systemName represents name of the system stream is associated with + * @param streamName represents the name of the Stream + * @param partition represents the messages that the {@link org.apache.samza.system.SystemStreamPartition} will be + * initialized with + * @param <T> represents the type of a message in the stream + * @return a {@link CollectionStream} with only one partition containing messages of the type {@code T} + * + */ + public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> partition) { + return new CollectionStream<>(systemName, streamName, partition); + } + + /** + * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions + * as the size of {@code partitions} map. Key of the map {@code partitions} represents the {@code partitionId} of + * each {@link org.apache.samza.Partition} for a {@link org.apache.samza.system.SystemStreamPartition} and value is + * an Iterable of messages that the {@link org.apache.samza.system.SystemStreamPartition} should be initialized with. + * + * @param systemName represents name of the system stream is associated with + * @param streamName represents the name of the Stream + * @param partitions Key of an entry in partitions represents a {@code partitionId} of a {@link org.apache.samza.Partition} + * and value represents the stream of messages the {@link org.apache.samza.system.SystemStreamPartition} + * will be initialized with + * @param <T> represents the type of a message in the stream + * @return a {@link CollectionStream} with multiple partitions each containing messages of the type {@code T} + * + */ + public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> partitions) { + return new CollectionStream<>(systemName, streamName, partitions); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java new file mode 100644 index 0000000..c005c41 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework.system; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.system.inmemory.InMemorySystemFactory; + + +/** + * CollectionStreamSystem represents a system that interacts with an underlying {@link InMemorySystemFactory} to create + * various input and output streams and initialize {@link org.apache.samza.system.SystemStreamPartition} with messages + * <p> + * Following system level configs are set by default + * <ol> + * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li> + * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> + * </ol> + * + */ +public class CollectionStreamSystemSpec { + private static final String SYSTEM_FACTORY = "systems.%s.samza.factory"; + private static final String SYSTEM_OFFSET = "systems.%s.default.stream.samza.offset.default"; + + private String systemName; + private Map<String, String> systemConfigs; + + /** + * Constructs a new CollectionStreamSystem from specified components. + * <p> + * Every {@link CollectionStreamSystemSpec} is assumed to consume from the oldest offset, since stream is in memory and + * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams. + * <p> + * @param systemName represents unique name of the system + */ + private CollectionStreamSystemSpec(String systemName) { + this.systemName = systemName; + systemConfigs = new HashMap<String, String>(); + systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), InMemorySystemFactory.class.getName()); + systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest"); + } + + public String getSystemName() { + return systemName; + } + + public Map<String, String> getSystemConfigs() { + return systemConfigs; + } + + /** + * Creates a {@link CollectionStreamSystemSpec} with name {@code name} + * @param name represents name of the {@link CollectionStreamSystemSpec} + * @return an instance of {@link CollectionStreamSystemSpec} + */ + public static CollectionStreamSystemSpec create(String name) { + Preconditions.checkState(name != null); + return new CollectionStreamSystemSpec(name); + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java new file mode 100644 index 0000000..c991b8c --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import java.util.Arrays; +import java.util.List; +import org.apache.samza.test.framework.stream.CollectionStream; +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.Assert; +import org.junit.Test; + + +public class AsyncStreamTaskIntegrationTest { + + @Test + public void testAsyncTaskWithSinglePartition() throws Exception { + List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); + + CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList); + CollectionStream output = CollectionStream.empty("async-test", "ints-out"); + + TestRunner + .of(MyAsyncStreamTask.class) + .addInputStream(input) + .addOutputStream(output) + .run(); + + Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0), + IsIterableContainingInOrder.contains(outputList.toArray())); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java new file mode 100644 index 0000000..347e766 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import java.util.Random; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.AsyncStreamTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCallback; +import org.apache.samza.task.TaskCoordinator; + + +public class MyAsyncStreamTask implements AsyncStreamTask { + @Override + public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, + final TaskCallback callback) { + // Mimic a random callback delay ans send message + RestCall call = new RestCall(envelope, collector, callback); + call.start(); + } +} + +class RestCall extends Thread { + static Random random = new Random(); + IncomingMessageEnvelope envelope; + MessageCollector messageCollector; + TaskCallback callback; + + RestCall(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCallback callback) { + this.envelope = envelope; + this.callback = callback; + this.messageCollector = collector; + } + + @Override + public void run() { + try { + // Let the thread sleep for a while. + Thread.sleep(random.nextInt(150)); + } catch (InterruptedException e) { + System.out.println("Thread " + this.getName() + " interrupted."); + } + Integer obj = (Integer) envelope.getMessage(); + messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("async-test", "ints-out"), obj * 10)); + callback.complete(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java new file mode 100644 index 0000000..c83e461 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; + + +public class MyStreamTestTask implements StreamTask { + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + Integer obj = (Integer) envelope.getMessage(); + collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"), obj * 10)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java new file mode 100644 index 0000000..307c1b5 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.framework; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.test.framework.stream.CollectionStream; +import static org.apache.samza.test.controlmessages.TestData.PageView; +import org.junit.Assert; +import org.junit.Test; + + +public class StreamApplicationIntegrationTest { + + final StreamApplication app = (streamGraph, cfg) -> { + streamGraph.<KV<String, PageView>>getInputStream("PageView") + .map(Values.create()) + .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") + .sink((m, collector, coordinator) -> { + collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"), + m.getKey(), m.getKey(), + m)); + }); + }; + + private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"}; + + @Test + public void testHighLevelApi() throws Exception { + Random random = new Random(); + int count = 10; + List<PageView> pageviews = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; + int memberId = i; + pageviews.add(new PageView(pagekey, memberId)); + } + + CollectionStream<PageView> input = CollectionStream.of("test", "PageView", pageviews); + CollectionStream output = CollectionStream.empty("test", "Output", 10); + + TestRunner + .of(app) + .addInputStream(input) + .addOutputStream(output) + .addOverrideConfig("job.default.system", "test") + .run(); + + Assert.assertEquals(TestRunner.consumeStream(output, 10000).get(random.nextInt(count)).size(), 1); + } + + public static final class Values { + public static <K, V, M extends KV<K, V>> MapFunction<M, V> create() { + return (M m) -> m.getValue(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/15c5a1a7/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java new file mode 100644 index 0000000..e052539 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import java.util.Arrays; +import java.util.List; +import org.apache.samza.test.framework.stream.CollectionStream; +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.Assert; +import org.junit.Test; + +public class StreamTaskIntegrationTest { + + @Test + public void testSyncTaskWithSinglePartition() throws Exception { + List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); + + CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList); + CollectionStream output = CollectionStream.empty("test", "output"); + + TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(); + + Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0), + IsIterableContainingInOrder.contains(outputList.toArray())); + } + +}
