Repository: samza Updated Branches: refs/heads/master e753c8de3 -> c91da7842
SAMZA-1226: relax type parameters in MessageStream functions relax the type parameter in user supplied functions in fluent API Author: Yi Pan (Data Infrastructure) <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Navina Ramesh <[email protected]>, Jacob Maes <[email protected]> Closes #133 from nickpan47/SAMZA-1226 and squashes the following commits: b8d3461 [Yi Pan (Data Infrastructure)] SAMZA-1226: cleanup code example in StreamApplication javadoc 93fa471 [Yi Pan (Data Infrastructure)] SAMZA-1226: added more unit tests for type-cast functions 18e1e9f [Yi Pan (Data Infrastructure)] SAMZA-1226: address review feedbacks b5da53b [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1226 7981b83 [Yi Pan (Data Infrastructure)] SAMZA-1226: relax type parameters in MessageStream functions Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c91da784 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c91da784 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c91da784 Branch: refs/heads/master Commit: c91da78428b71bd96a2a3724fa9fe8bfb1dc4f64 Parents: e753c8d Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Fri Apr 21 16:14:41 2017 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Apr 21 16:14:41 2017 -0700 ---------------------------------------------------------------------- .../samza/application/StreamApplication.java | 37 +++- .../apache/samza/operators/MessageStream.java | 16 +- .../org/apache/samza/operators/StreamGraph.java | 4 +- .../apache/samza/operators/windows/Windows.java | 31 +-- .../java/org/apache/samza/task/TaskContext.java | 3 +- .../apache/samza/config/ApplicationConfig.java | 5 + .../samza/operators/MessageStreamImpl.java | 40 ++-- .../apache/samza/operators/StreamGraphImpl.java | 33 +++- .../samza/operators/spec/OperatorSpecs.java | 16 +- .../operators/spec/StreamOperatorSpec.java | 2 +- .../stream/IntermediateStreamInternalImpl.java | 4 +- .../org/apache/samza/task/TaskFactoryUtil.java | 8 +- .../samza/execution/TestExecutionPlanner.java | 20 +- .../execution/TestJobGraphJsonGenerator.java | 15 +- .../samza/operators/TestMessageStreamImpl.java | 95 +++++++-- .../samza/operators/TestStreamGraphImpl.java | 193 +++++++++++++++++++ .../samza/operators/data/MessageType.java | 37 ++++ .../data/TestExtOutputMessageEnvelope.java | 29 +++ .../data/TestInputMessageEnvelope.java | 32 +++ .../operators/data/TestMessageEnvelope.java | 17 -- .../samza/operators/spec/TestOperatorSpecs.java | 61 +++++- .../apache/samza/task/TestTaskFactoryUtil.java | 27 +-- .../test/integration/StreamTaskTestUtil.scala | 2 +- 23 files changed, 601 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java index eeece10..a26c5af 100644 --- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java +++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java @@ -24,11 +24,43 @@ import org.apache.samza.operators.StreamGraph; /** - * This interface defines a template for stream application that user will implement to create operator DAG in {@link StreamGraph}. + * This interface defines a template for stream application that user will implement to initialize operator DAG in {@link StreamGraph}. + * + * <p> + * User program implements {@link StreamApplication#init(StreamGraph, Config)} method to initialize the transformation logic + * from all input streams to output streams. A simple user code example is shown below: + * </p> + * + * <pre>{@code + * public class PageViewCounterExample implements StreamApplication { + * // max timeout is 60 seconds + * private static final MAX_TIMEOUT = 60000; + * + * public void init(StreamGraph graph, Config config) { + * MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m); + * OutputStream<String, PageViewEvent, PageViewEvent> pageViewEventFilteredStream = graph + * .getOutputStream("pageViewEventFiltered", m -> m.memberId, m -> m); + * + * pageViewEvents + * .filter(m -> !(m.getMessage().getEventTime() < System.currentTimeMillis() - MAX_TIMEOUT)) + * .sendTo(pageViewEventFilteredStream); + * } + * + * // local execution mode + * public static void main(String[] args) { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * PageViewCounterExample userApp = new PageViewCounterExample(); + * ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + * localRunner.run(userApp); + * } + * + * } + * }</pre> + * */ @InterfaceStability.Unstable public interface StreamApplication { - static final String APP_CLASS_CONFIG = "app.class"; /** * Users are required to implement this abstract method to initialize the processing logic of the application, in terms @@ -38,4 +70,5 @@ public interface StreamApplication { * @param config the {@link Config} of the application */ void init(StreamGraph graph, Config config); + } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 345bff0..c406a93 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -50,7 +50,7 @@ public interface MessageStream<M> { * @param <TM> the type of messages in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ - <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn); + <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn); /** * Applies the provided 1:n function to transform a message in this {@link MessageStream} @@ -60,7 +60,7 @@ public interface MessageStream<M> { * @param <TM> the type of messages in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ - <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn); + <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn); /** * Applies the provided function to messages in this {@link MessageStream} and returns the @@ -72,7 +72,7 @@ public interface MessageStream<M> { * @param filterFn the predicate to filter messages from this {@link MessageStream} * @return the transformed {@link MessageStream} */ - MessageStream<M> filter(FilterFunction<M> filterFn); + MessageStream<M> filter(FilterFunction<? super M> filterFn); /** * Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}. @@ -83,7 +83,7 @@ public interface MessageStream<M> { * * @param sinkFn the function to send messages in this stream to an external system */ - void sink(SinkFunction<M> sinkFn); + void sink(SinkFunction<? super M> sinkFn); /** * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}. @@ -120,10 +120,10 @@ public interface MessageStream<M> { * @param ttl the ttl for messages in each stream * @param <K> the type of join key * @param <OM> the type of messages in the other stream - * @param <RM> the type of messages resulting from the {@code joinFn} + * @param <TM> the type of messages resulting from the {@code joinFn} * @return the joined {@link MessageStream} */ - <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn, Duration ttl); + <K, OM, TM> MessageStream<TM> join(MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl); /** * Merge all {@code otherStreams} with this {@link MessageStream}. @@ -133,7 +133,7 @@ public interface MessageStream<M> { * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream} * @return the merged {@link MessageStream} */ - MessageStream<M> merge(Collection<MessageStream<M>> otherStreams); + MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams); /** * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes @@ -144,6 +144,6 @@ public interface MessageStream<M> { * @param <K> the type of output message key and partition key * @return the repartitioned {@link MessageStream} */ - <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor); + <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor); } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java index ff1c580..a03f7c3 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -40,7 +40,7 @@ public interface StreamGraph { * @param <M> the type of message in the input {@link MessageStream} * @return the input {@link MessageStream} */ - <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder); + <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder); /** * Gets the {@link OutputStream} corresponding to the logical {@code streamId}. @@ -54,7 +54,7 @@ public interface StreamGraph { * @return the output {@link MessageStream} */ <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId, - Function<M, K> keyExtractor, Function<M, V> msgExtractor); + Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor); /** * Sets the {@link ContextManager} for this {@link StreamGraph}. http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 9192fc1..721b4c0 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -119,11 +119,12 @@ public final class Windows { * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function. */ - public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> keyFn, Duration interval, - Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) { + public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval, + Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); - return new WindowInternal<M, K, WV>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.TUMBLING); + return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, + (Function<M, K>) keyFn, null, WindowType.TUMBLING); } @@ -147,10 +148,10 @@ public final class Windows { * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) { + public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval) { FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); - Supplier<Collection<M>> initialValue = () -> new ArrayList<>(); + Supplier<Collection<M>> initialValue = ArrayList::new; return keyedTumblingWindow(keyFn, interval, initialValue, aggregator); } @@ -175,10 +176,11 @@ public final class Windows { * @param <WV> the type of the {@link WindowPane} output value * @return the created {@link Window} function */ - public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<WV> initialValue, - FoldLeftFunction<M, WV> foldFn) { + public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue, + FoldLeftFunction<? super M, WV> foldFn) { Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration)); - return new WindowInternal<>(defaultTrigger, initialValue, foldFn, null, null, WindowType.TUMBLING); + return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, + null, null, WindowType.TUMBLING); } /** @@ -203,7 +205,7 @@ public final class Windows { public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) { FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); - Supplier<Collection<M>> initialValue = () -> new ArrayList<>(); + Supplier<Collection<M>> initialValue = ArrayList::new; return tumblingWindow(duration, initialValue, aggregator); } @@ -235,10 +237,11 @@ public final class Windows { * @param <WV> the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, - Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) { + public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap, + Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) { Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); - return new WindowInternal<>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.SESSION); + return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, (Function<M, K>) keyFn, + null, WindowType.SESSION); } /** @@ -265,11 +268,11 @@ public final class Windows { * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) { + public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap) { FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); - Supplier<Collection<M>> initialValue = () -> new ArrayList<>(); + Supplier<Collection<M>> initialValue = ArrayList::new; return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator); } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 128cff1..dc5742f 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -58,10 +58,9 @@ public interface TaskContext { /** * Method to allow user to return customized context * - * @param <T> the type of user-defined task context * @return user-defined task context object */ - default <T> T getUserDefinedContext() { + default Object getUserDefinedContext() { return null; }; } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java index 1c00735..9eb4161 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java @@ -46,6 +46,7 @@ public class ApplicationConfig extends MapConfig { public static final String APP_COORDINATION_SERVICE_FACTORY_CLASS = "app.coordination.service.factory.class"; public static final String APP_NAME = "app.name"; public static final String APP_ID = "app.id"; + public static final String APP_CLASS = "app.class"; public ApplicationConfig(Config config) { super(config); @@ -67,6 +68,10 @@ public class ApplicationConfig extends MapConfig { return get(APP_ID, get(JobConfig.JOB_ID(), "1")); } + public String getAppClass() { + return get(APP_CLASS, null); + } + @Deprecated public String getProcessorId() { return get(PROCESSOR_ID, null); http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index dfe231e..69a41db 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -72,7 +72,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) { + public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) { OperatorSpec<TM> op = OperatorSpecs.createMapOperatorSpec( mapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); @@ -80,7 +80,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public MessageStream<M> filter(FilterFunction<M> filterFn) { + public MessageStream<M> filter(FilterFunction<? super M> filterFn) { OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec( filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); @@ -88,7 +88,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) { + public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) { OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec( flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); @@ -96,7 +96,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public void sink(SinkFunction<M> sinkFn) { + public void sink(SinkFunction<? super M> sinkFn) { SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); } @@ -110,22 +110,22 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { - OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, - new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); + OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec( + (WindowInternal<M, K, WV>) window, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); this.registeredOperatorSpecs.add(wndOp); return wndOp.getNextStream(); } @Override - public <K, JM, RM> MessageStream<RM> join( - MessageStream<JM> otherStream, JoinFunction<K, M, JM, RM> joinFn, Duration ttl) { - MessageStreamImpl<RM> nextStream = new MessageStreamImpl<>(this.graph); + public <K, OM, TM> MessageStream<TM> join( + MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl) { + MessageStreamImpl<TM> nextStream = new MessageStreamImpl<>(this.graph); - PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn = new PartialJoinFunction<K, M, JM, RM>() { - private KeyValueStore<K, PartialJoinMessage<M>> thisStreamState; + PartialJoinFunction<K, M, OM, TM> thisPartialJoinFn = new PartialJoinFunction<K, M, OM, TM>() { + private KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> thisStreamState; @Override - public RM apply(M m, JM jm) { + public TM apply(M m, OM jm) { return joinFn.apply(m, jm); } @@ -148,21 +148,21 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } }; - PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn = new PartialJoinFunction<K, JM, M, RM>() { - private KeyValueStore<K, PartialJoinMessage<JM>> otherStreamState; + PartialJoinFunction<K, OM, M, TM> otherPartialJoinFn = new PartialJoinFunction<K, OM, M, TM>() { + private KeyValueStore<K, PartialJoinMessage<OM>> otherStreamState; @Override - public RM apply(JM om, M m) { + public TM apply(OM om, M m) { return joinFn.apply(m, om); } @Override - public K getKey(JM message) { + public K getKey(OM message) { return joinFn.getSecondKey(message); } @Override - public KeyValueStore<K, PartialJoinMessage<JM>> getState() { + public KeyValueStore<K, PartialJoinMessage<OM>> getState() { return otherStreamState; } @@ -175,7 +175,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec( thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId())); - ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs + ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs .add(OperatorSpecs.createPartialJoinOperatorSpec( otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId())); @@ -183,7 +183,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) { + public MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams) { MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph); otherStreams.add(this); @@ -193,7 +193,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor) { + public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor) { int opId = this.graph.getNextOpId(); String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId); MessageStreamImpl<M> intermediateStream = http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index a49b68e..86ce6a4 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -61,16 +61,25 @@ public class StreamGraphImpl implements StreamGraph { } @Override - public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder) { + public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder) { + if (msgBuilder == null) { + throw new IllegalArgumentException("msgBuilder can't be null for an input stream"); + } return inStreams.computeIfAbsent(runner.getStreamSpec(streamId), - streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, msgBuilder)); + streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, (BiFunction<K, V, M>) msgBuilder)); } @Override public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId, - Function<M, K> keyExtractor, Function<M, V> msgExtractor) { + Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor) { + if (keyExtractor == null) { + throw new IllegalArgumentException("keyExtractor can't be null for an output stream."); + } + if (msgExtractor == null) { + throw new IllegalArgumentException("msgExtractor can't be null for an output stream."); + } return outStreams.computeIfAbsent(runner.getStreamSpec(streamId), - streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, keyExtractor, msgExtractor)); + streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor)); } @Override @@ -95,16 +104,28 @@ public class StreamGraphImpl implements StreamGraph { * @return the intermediate {@link MessageStreamImpl} */ <K, V, M> MessageStreamImpl<M> getIntermediateStream(String streamName, - Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) { + Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor, BiFunction<? super K, ? super V, ? extends M> msgBuilder) { String streamId = String.format("%s-%s-%s", config.get(JobConfig.JOB_NAME()), config.get(JobConfig.JOB_ID(), "1"), streamName); + if (msgBuilder == null) { + throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream"); + } + + if (keyExtractor == null) { + throw new IllegalArgumentException("keyExtractor can't be null for an output stream."); + } + if (msgExtractor == null) { + throw new IllegalArgumentException("msgExtractor can't be null for an output stream."); + } + StreamSpec streamSpec = runner.getStreamSpec(streamId); IntermediateStreamInternalImpl<K, V, M> intStream = (IntermediateStreamInternalImpl<K, V, M>) inStreams .computeIfAbsent(streamSpec, - k -> new IntermediateStreamInternalImpl<>(this, streamSpec, keyExtractor, msgExtractor, msgBuilder)); + k -> new IntermediateStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor, + (Function<M, V>) msgExtractor, (BiFunction<K, V, M>) msgBuilder)); outStreams.putIfAbsent(streamSpec, intStream); return intStream; } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index e2c4b9a..0b93bbe 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -53,7 +53,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} */ public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec( - MapFunction<M, OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) { + MapFunction<? super M, ? extends OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) { return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() { @Override public Collection<OM> apply(M message) { @@ -84,7 +84,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} */ public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec( - FilterFunction<M> filterFn, MessageStreamImpl<M> nextStream, int opId) { + FilterFunction<? super M> filterFn, MessageStreamImpl<M> nextStream, int opId) { return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() { @Override public Collection<M> apply(M message) { @@ -115,8 +115,8 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} */ public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec( - FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) { - return new StreamOperatorSpec<>(transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId); + FlatMapFunction<? super M, ? extends OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) { + return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId); } /** @@ -127,8 +127,8 @@ public class OperatorSpecs { * @param <M> type of input message * @return the {@link SinkOperatorSpec} for the sink operator */ - public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, int opId) { - return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, opId); + public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, int opId) { + return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, OperatorSpec.OpCode.SINK, opId); } /** @@ -195,7 +195,7 @@ public class OperatorSpecs { public static <K, M, JM, RM> PartialJoinOperatorSpec<K, M, JM, RM> createPartialJoinOperatorSpec( PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, long ttlMs, MessageStreamImpl<RM> nextStream, int opId) { - return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId); + return new PartialJoinOperatorSpec<>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId); } /** @@ -207,7 +207,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} for the merge */ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> nextStream, int opId) { - return new StreamOperatorSpec<M, M>(message -> + return new StreamOperatorSpec<>(message -> new ArrayList<M>() { { this.add(message); http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index 3c427c7..f9bbe2d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -45,7 +45,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph} */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl nextStream, + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream, OperatorSpec.OpCode opCode, int opId) { this.transformFn = transformFn; this.nextStream = nextStream; http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java index a1bee6a..8f45f7a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java @@ -33,8 +33,8 @@ public class IntermediateStreamInternalImpl<K, V, M> extends MessageStreamImpl<M private final Function<M, V> msgExtractor; private final BiFunction<K, V, M> msgBuilder; - public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, - Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) { + public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, Function<M, K> keyExtractor, + Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) { super(graph); this.streamSpec = streamSpec; this.keyExtractor = keyExtractor; http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java index 445d13e..6408e6f 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java @@ -19,6 +19,7 @@ package org.apache.samza.task; import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.application.StreamApplication; @@ -158,19 +159,20 @@ public class TaskFactoryUtil { * @return {@link StreamApplication} instance */ public static StreamApplication createStreamApplication(Config config) { - if (config.get(StreamApplication.APP_CLASS_CONFIG) != null && !config.get(StreamApplication.APP_CLASS_CONFIG).isEmpty()) { + ApplicationConfig appConfig = new ApplicationConfig(config); + if (appConfig.getAppClass() != null && !appConfig.getAppClass().isEmpty()) { TaskConfig taskConfig = new TaskConfig(config); if (taskConfig.getTaskClass() != null && !taskConfig.getTaskClass().isEmpty()) { throw new ConfigException("High level StreamApplication API cannot be used together with low-level API using task.class."); } - String appClassName = config.get(StreamApplication.APP_CLASS_CONFIG); + String appClassName = appConfig.getAppClass(); try { Class<?> builderClass = Class.forName(appClassName); return (StreamApplication) builderClass.newInstance(); } catch (Throwable t) { String errorMsg = String.format("Failed to create StreamApplication class from the config. %s = %s", - StreamApplication.APP_CLASS_CONFIG, config.get(StreamApplication.APP_CLASS_CONFIG)); + ApplicationConfig.APP_CLASS, appConfig.getAppClass()); log.error(errorMsg, t); throw new ConfigException(errorMsg, t); } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index c55fcd0..b7f952a 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -42,6 +42,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -116,8 +118,10 @@ public class TestExecutionPlanner { * */ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null); - streamGraph.getInputStream("input1", null) + Function mockFn = mock(Function.class); + OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn); + BiFunction mockBuilder = mock(BiFunction.class); + streamGraph.getInputStream("input1", mockBuilder) .partitionBy(m -> "yes!!!").map(m -> m) .sendTo(output1); return streamGraph; @@ -137,11 +141,13 @@ public class TestExecutionPlanner { */ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null); - OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", null, null); + BiFunction msgBuilder = mock(BiFunction.class); + MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); + MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true); + MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + Function mockFn = mock(Function.class); + OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn); + OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn); m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1); m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2); http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index 9f9945b..c4ab922 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -22,6 +22,9 @@ package org.apache.samza.execution; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; + import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -101,11 +104,13 @@ public class TestJobGraphJsonGenerator { StreamManager streamManager = new StreamManager(systemAdmins); StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", null, null); - OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", null, null); + BiFunction mockBuilder = mock(BiFunction.class); + MessageStream m1 = streamGraph.getInputStream("input1", mockBuilder).map(m -> m); + MessageStream m2 = streamGraph.getInputStream("input2", mockBuilder).partitionBy(m -> "haha").filter(m -> true); + MessageStream m3 = streamGraph.getInputStream("input3", mockBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + Function mockFn = mock(Function.class); + OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", mockFn, mockFn); + OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn); m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1); m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2); http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index e815b81..44870fd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -21,8 +21,7 @@ package org.apache.samza.operators; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.operators.data.TestMessageEnvelope; -import org.apache.samza.operators.data.TestOutputMessageEnvelope; +import org.apache.samza.operators.data.*; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; @@ -36,6 +35,7 @@ import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; @@ -43,14 +43,15 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Function; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -72,7 +73,7 @@ public class TestMessageStreamImpl { assertEquals(mapOp.getNextStream(), outputStream); // assert that the transformation function is what we defined above TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class); - TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); + MessageType mockInnerTestMessage = mock(MessageType.class); when(xTestMsg.getKey()).thenReturn("test-msg-key"); when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage); when(mockInnerTestMessage.getValue()).thenReturn("123456789"); @@ -87,20 +88,74 @@ public class TestMessageStreamImpl { @Test public void testFlatMap() { MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); - Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { { + List<TestOutputMessageEnvelope> flatOuts = new ArrayList<TestOutputMessageEnvelope>() { { this.add(mock(TestOutputMessageEnvelope.class)); this.add(mock(TestOutputMessageEnvelope.class)); this.add(mock(TestOutputMessageEnvelope.class)); } }; - FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts; + final List<TestMessageEnvelope> inputMsgs = new ArrayList<>(); + FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> { + inputMsgs.add(message); + return flatOuts; + }; + MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap); + Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next(); + assertTrue(flatMapOp instanceof StreamOperatorSpec); + assertEquals(flatMapOp.getNextStream(), outputStream); + assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap); + + TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); + // assert that the transformation function is what we defined above + List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>) + ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput); + assertEquals(flatOuts, result); + assertEquals(inputMsgs.size(), 1); + assertEquals(inputMsgs.get(0), mockInput); + } + + @Test + public void testFlatMapWithRelaxedTypes() { + MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + List<TestExtOutputMessageEnvelope> flatOuts = new ArrayList<TestExtOutputMessageEnvelope>() { { + this.add(new TestExtOutputMessageEnvelope("output-key-1", 1, "output-id-001")); + this.add(new TestExtOutputMessageEnvelope("output-key-2", 2, "output-id-002")); + this.add(new TestExtOutputMessageEnvelope("output-key-3", 3, "output-id-003")); + } }; + + class MyFlatMapFunction implements FlatMapFunction<TestMessageEnvelope, TestExtOutputMessageEnvelope> { + public final List<TestMessageEnvelope> inputMsgs = new ArrayList<>(); + + @Override + public Collection<TestExtOutputMessageEnvelope> apply(TestMessageEnvelope message) { + inputMsgs.add(message); + return flatOuts; + } + + @Override + public void init(Config config, TaskContext context) { + inputMsgs.clear(); + } + } + + MyFlatMapFunction xFlatMap = new MyFlatMapFunction(); + MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap); Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); assertEquals(subs.size(), 1); OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next(); assertTrue(flatMapOp instanceof StreamOperatorSpec); assertEquals(flatMapOp.getNextStream(), outputStream); + assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap); + + TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); // assert that the transformation function is what we defined above - assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap); + List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>) + ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput); + assertEquals(flatOuts, result); + assertEquals(xFlatMap.inputMsgs.size(), 1); + assertEquals(xFlatMap.inputMsgs.get(0), mockInput); } @Test @@ -116,7 +171,7 @@ public class TestMessageStreamImpl { // assert that the transformation function is what we defined above FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn(); TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); - TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); + MessageType mockInnerTestMessage = mock(MessageType.class); when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage); when(mockInnerTestMessage.getEventTime()).thenReturn(11111L); Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg); @@ -131,8 +186,9 @@ public class TestMessageStreamImpl { @Test public void testSink() { MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + SystemStream testStream = new SystemStream("test-sys", "test-stream"); SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> { - mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); + mc.send(new OutgoingMessageEnvelope(testStream, m.getMessage())); tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); }; inputStream.sink(xSink); @@ -141,6 +197,21 @@ public class TestMessageStreamImpl { OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next(); assertTrue(sinkOp instanceof SinkOperatorSpec); assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink); + + TestMessageEnvelope mockTest1 = mock(TestMessageEnvelope.class); + MessageType mockMsgBody = mock(MessageType.class); + when(mockTest1.getMessage()).thenReturn(mockMsgBody); + final List<OutgoingMessageEnvelope> outMsgs = new ArrayList<>(); + MessageCollector mockCollector = mock(MessageCollector.class); + doAnswer(invocation -> { + outMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]); + return null; + }).when(mockCollector).send(any()); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + ((SinkOperatorSpec) sinkOp).getSinkFn().apply(mockTest1, mockCollector, mockCoordinator); + assertEquals(1, outMsgs.size()); + assertEquals(testStream, outMsgs.get(0).getSystemStream()); + assertEquals(mockMsgBody, outMsgs.get(0).getMessage()); } @Test @@ -189,14 +260,14 @@ public class TestMessageStreamImpl { @Test public void testMerge() { MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph); - Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { { + Collection<MessageStream<? extends TestMessageEnvelope>> others = new ArrayList<MessageStream<? extends TestMessageEnvelope>>() { { this.add(new MessageStreamImpl<>(mockGraph)); this.add(new MessageStreamImpl<>(mockGraph)); } }; MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others); validateMergeOperator(merge1, mergeOutput); - others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); + others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput)); } private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) { http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java new file mode 100644 index 0000000..3ab1a3c --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.data.MessageType; +import org.apache.samza.operators.data.TestInputMessageEnvelope; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.stream.InputStreamInternalImpl; +import org.apache.samza.operators.stream.IntermediateStreamInternalImpl; +import org.apache.samza.operators.stream.OutputStreamInternalImpl; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStreamGraphImpl { + + @Test + public void testGetInputStream() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system"); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder = + (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1"); + MessageStream<TestMessageEnvelope> mInputStream = graph.getInputStream("test-stream-1", xMsgBuilder); + assertEquals(graph.getInputStreams().get(testStreamSpec), mInputStream); + assertTrue(mInputStream instanceof InputStreamInternalImpl); + assertEquals(((InputStreamInternalImpl) mInputStream).getMsgBuilder(), xMsgBuilder); + + String key = "test-input-key"; + MessageType msgBody = new MessageType("test-msg-value", 333333L); + TestMessageEnvelope xInputMsg = ((InputStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mInputStream). + getMsgBuilder().apply(key, msgBody); + assertEquals(xInputMsg.getKey(), key); + assertEquals(xInputMsg.getMessage().getValue(), msgBody.getValue()); + assertEquals(xInputMsg.getMessage().getEventTime(), msgBody.getEventTime()); + assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1"); + } + + @Test + public void testGetOutputStream() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system"); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec); + + class MyMessageType extends MessageType { + public final String outputId; + + public MyMessageType(String value, long eventTime, String outputId) { + super(value, eventTime); + this.outputId = outputId; + } + } + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey(); + Function<TestMessageEnvelope, MyMessageType> xMsgExtractor = + x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1"); + + OutputStream<String, MyMessageType, TestInputMessageEnvelope> mOutputStream = + graph.getOutputStream("test-stream-1", xKeyExtractor, xMsgExtractor); + assertEquals(graph.getOutputStreams().get(testStreamSpec), mOutputStream); + assertTrue(mOutputStream instanceof OutputStreamInternalImpl); + assertEquals(((OutputStreamInternalImpl) mOutputStream).getKeyExtractor(), xKeyExtractor); + assertEquals(((OutputStreamInternalImpl) mOutputStream).getMsgExtractor(), xMsgExtractor); + + TestInputMessageEnvelope xInputMsg = new TestInputMessageEnvelope("test-key-1", "test-msg-1", 33333L, "input-id-1"); + assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream). + getKeyExtractor().apply(xInputMsg), "test-key-1"); + assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream). + getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1"); + assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream). + getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L); + assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream). + getMsgExtractor().apply(xInputMsg).outputId, "test-output-id-1"); + } + + @Test + public void testWithContextManager() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + // ensure that default is noop + TaskContext mockContext = mock(TaskContext.class); + assertEquals(graph.getContextManager().initTaskContext(mockConfig, mockContext), mockContext); + + ContextManager testContextManager = new ContextManager() { + @Override + public TaskContext initTaskContext(Config config, TaskContext context) { + return null; + } + + @Override + public void finalizeTaskContext() { + + } + }; + + graph.withContextManager(testContextManager); + assertEquals(graph.getContextManager(), testContextManager); + } + + @Test + public void testGetIntermediateStream() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec testStreamSpec = new StreamSpec("myJob-i001-test-stream-1", "physical-stream-1", "test-system"); + when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(testStreamSpec); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); + when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); + + class MyMessageType extends MessageType { + public final String outputId; + + public MyMessageType(String value, long eventTime, String outputId) { + super(value, eventTime); + this.outputId = outputId; + } + } + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey(); + Function<TestMessageEnvelope, MyMessageType> xMsgExtractor = + x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1"); + BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder = + (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1"); + + MessageStream<TestMessageEnvelope> mIntermediateStream = + graph.getIntermediateStream("test-stream-1", xKeyExtractor, xMsgExtractor, xMsgBuilder); + assertEquals(graph.getOutputStreams().get(testStreamSpec), mIntermediateStream); + assertTrue(mIntermediateStream instanceof IntermediateStreamInternalImpl); + assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getKeyExtractor(), xKeyExtractor); + assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgExtractor(), xMsgExtractor); + assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgBuilder(), xMsgBuilder); + + TestMessageEnvelope xInputMsg = new TestMessageEnvelope("test-key-1", "test-msg-1", 33333L); + assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream). + getKeyExtractor().apply(xInputMsg), "test-key-1"); + assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream). + getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1"); + assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream). + getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L); + assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream). + getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getKey(), "test-key-1"); + assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream). + getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getValue(), "test-msg-1"); + assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream). + getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getEventTime(), 33333L); + } + + @Test + public void testGetNextOpId() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + assertEquals(graph.getNextOpId(), 0); + assertEquals(graph.getNextOpId(), 1); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java b/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java new file mode 100644 index 0000000..3fd015b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +public class MessageType { + private final String value; + private final long eventTime; + + public MessageType(String value, long eventTime) { + this.value = value; + this.eventTime = eventTime; + } + + public long getEventTime() { + return eventTime; + } + + public String getValue() { + return value; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java new file mode 100644 index 0000000..22222ed --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +public class TestExtOutputMessageEnvelope extends TestOutputMessageEnvelope { + private final String outputId; + + public TestExtOutputMessageEnvelope(String key, Integer value, String outputId) { + super(key, value); + this.outputId = outputId; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java new file mode 100644 index 0000000..089f534 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +public class TestInputMessageEnvelope extends TestMessageEnvelope { + private final String inputId; + + public TestInputMessageEnvelope(String key, String value, long eventTime, String inputId) { + super(key, value, eventTime); + this.inputId = inputId; + } + + public String getInputId() { + return this.inputId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java index 2524c28..05a63cd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java +++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java @@ -37,21 +37,4 @@ public class TestMessageEnvelope { return this.key; } - public class MessageType { - private final String value; - private final long eventTime; - - public MessageType(String value, long eventTime) { - this.value = value; - this.eventTime = eventTime; - } - - public long getEventTime() { - return eventTime; - } - - public String getValue() { - return value; - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java index 37e3d1a..d227206 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -21,6 +21,8 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.TestMessageStreamImplUtil; +import org.apache.samza.operators.data.MessageType; +import org.apache.samza.operators.data.TestInputMessageEnvelope; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; @@ -31,39 +33,71 @@ import org.apache.samza.operators.stream.OutputStreamInternalImpl; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.function.Function; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestOperatorSpecs { @Test public void testCreateStreamOperator() { - FlatMapFunction<?, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { + FlatMapFunction<Object, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L)); } }; MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class); - StreamOperatorSpec<?, TestMessageEnvelope> streamOp = + StreamOperatorSpec<Object, TestMessageEnvelope> streamOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1); assertEquals(streamOp.getTransformFn(), transformFn); + + Object mockInput = mock(Object.class); + when(mockInput.toString()).thenReturn("test-string-1"); + List<TestMessageEnvelope> outputs = (List<TestMessageEnvelope>) streamOp.getTransformFn().apply(mockInput); + assertEquals(outputs.size(), 1); + assertEquals(outputs.get(0).getKey(), "test-string-1"); + assertEquals(outputs.get(0).getMessage().getValue(), "test-string-1"); + assertEquals(outputs.get(0).getMessage().getEventTime(), 12345L); assertEquals(streamOp.getNextStream(), mockOutput); } @Test public void testCreateSinkOperator() { + SystemStream testStream = new SystemStream("test-sys", "test-stream"); SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector, - TaskCoordinator taskCoordinator) -> { }; + TaskCoordinator taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(testStream, message.getKey(), message.getMessage())); + }; SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1); assertEquals(sinkOp.getSinkFn(), sinkFn); + + TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); + when(mockInput.getKey()).thenReturn("my-test-msg-key"); + MessageType mockMsgBody = mock(MessageType.class); + when(mockInput.getMessage()).thenReturn(mockMsgBody); + final List<OutgoingMessageEnvelope> outputMsgs = new ArrayList<>(); + MessageCollector mockCollector = mock(MessageCollector.class); + doAnswer(invocation -> { + outputMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]); + return null; + }).when(mockCollector).send(any()); + sinkOp.getSinkFn().apply(mockInput, mockCollector, null); + assertEquals(1, outputMsgs.size()); + assertEquals(outputMsgs.get(0).getKey(), "my-test-msg-key"); + assertEquals(outputMsgs.get(0).getMessage(), mockMsgBody); assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK); assertEquals(sinkOp.getNextStream(), null); } @@ -104,6 +138,27 @@ public class TestOperatorSpecs { } @Test + public void testCreateWindowOperatorWithRelaxedTypes() throws Exception { + Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey(); + FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1; + Supplier<Integer> initialValue = () -> 0; + //instantiate a window using reflection + WindowInternal<TestInputMessageEnvelope, String, Integer> window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING); + + MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class); + WindowOperatorSpec spec = + OperatorSpecs.createWindowOperatorSpec(window, mockWndOut, 1); + assertEquals(spec.getWindow(), window); + assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); + assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator); + + // make sure that the functions with relaxed types work as expected + TestInputMessageEnvelope inputMsg = new TestInputMessageEnvelope("test-input-key1", "test-value-1", 23456L, "input-id-1"); + assertEquals("test-input-key1", spec.getWindow().getKeyExtractor().apply(inputMsg)); + assertEquals(1, spec.getWindow().getFoldLeftFunction().apply(inputMsg, 0)); + } + + @Test public void testCreatePartialJoinOperator() { PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn = mock(PartialJoinFunction.class); http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java index 0b051e8..e300996 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java @@ -19,6 +19,7 @@ package org.apache.samza.task; import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.MapConfig; @@ -74,7 +75,7 @@ public class TestTaskFactoryUtil { public void testCreateStreamApplication() throws Exception { Config config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); } }); StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); @@ -85,7 +86,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); } }); try { @@ -97,7 +98,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, "no.such.class"); + this.put(ApplicationConfig.APP_CLASS, "no.such.class"); } }); try { @@ -109,7 +110,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, ""); + this.put(ApplicationConfig.APP_CLASS, ""); } }); streamApp = TaskFactoryUtil.createStreamApplication(config); @@ -124,7 +125,7 @@ public class TestTaskFactoryUtil { public void testCreateStreamApplicationWithTaskClass() throws Exception { Config config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); } }); StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); @@ -133,7 +134,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); } }); try { @@ -146,7 +147,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { this.put("task.class", "no.such.class"); - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); } }); try { @@ -162,7 +163,7 @@ public class TestTaskFactoryUtil { Config config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); } }); try { @@ -175,7 +176,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); - this.put(StreamApplication.APP_CLASS_CONFIG, ""); + this.put(ApplicationConfig.APP_CLASS, ""); } }); StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); @@ -186,7 +187,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { this.put("task.class", ""); - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); } }); try { @@ -226,7 +227,7 @@ public class TestTaskFactoryUtil { Config config = new MapConfig(new HashMap<String, String>() { { - this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamApplication"); + this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); } }); try { @@ -239,7 +240,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); - this.put(StreamApplication.APP_CLASS_CONFIG, ""); + this.put(ApplicationConfig.APP_CLASS, ""); } }); StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); @@ -250,7 +251,7 @@ public class TestTaskFactoryUtil { config = new MapConfig(new HashMap<String, String>() { { this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); - this.put(StreamApplication.APP_CLASS_CONFIG, null); + this.put(ApplicationConfig.APP_CLASS, null); } }); streamApp = TaskFactoryUtil.createStreamApplication(config); http://git-wip-us.apache.org/repos/asf/samza/blob/c91da784/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index cda2690..29fb6d3 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -119,7 +119,7 @@ object StreamTaskTestUtil { servers = configs.map(TestUtils.createServer(_)).toBuffer val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT) - brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") + brokers = brokerList.split(",").map(p => "127.0.0.1" + p).mkString(",") // setup the zookeeper and bootstrap servers for local kafka cluster jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
