SAMZA-1073: Addressing review feedbacks. Change StreamGraphFactory to StreamGraphBuilder.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4bde68b4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4bde68b4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4bde68b4 Branch: refs/heads/samza-fluent-api-v1 Commit: 4bde68b4a542ac465e4f536bc548c2d6c4366fae Parents: b3dd886 Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Tue Feb 14 01:33:23 2017 -0800 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Wed Feb 15 15:13:09 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/operators/MessageStream.java | 16 +-- .../org/apache/samza/operators/StreamGraph.java | 24 ---- .../samza/operators/StreamGraphBuilder.java | 38 ++++++ .../samza/operators/StreamGraphFactory.java | 38 ------ .../operators/functions/FilterFunction.java | 12 +- .../operators/functions/FlatMapFunction.java | 12 +- .../samza/operators/functions/InitFunction.java | 38 ------ .../operators/functions/InitableFunction.java | 40 ++++++ .../samza/operators/functions/JoinFunction.java | 11 +- .../functions/KeyValueJoinFunction.java | 44 ------- .../samza/operators/functions/MapFunction.java | 12 +- .../samza/operators/functions/SinkFunction.java | 14 +- .../apache/samza/operators/windows/Window.java | 2 +- .../samza/system/ExecutionEnvironment.java | 6 +- .../java/org/apache/samza/task/TaskContext.java | 2 +- .../samza/operators/windows/TestWindowPane.java | 2 +- .../samza/operators/MessageStreamImpl.java | 5 - .../functions/PartialJoinFunction.java | 11 +- .../system/RemoteExecutionEnvironment.java | 37 ++++++ .../system/SingleJobExecutionEnvironment.java | 37 ------ .../system/StandaloneExecutionEnvironment.java | 19 ++- .../apache/samza/task/StreamOperatorTask.java | 25 ++-- .../apache/samza/example/BroadcastGraph.java | 121 ----------------- .../org/apache/samza/example/JoinGraph.java | 118 ----------------- .../samza/example/KeyValueStoreExample.java | 14 +- .../samza/example/NoContextStreamExample.java | 15 +-- .../samza/example/OrderShipmentJoinExample.java | 8 +- .../samza/example/PageViewCounterExample.java | 14 +- .../samza/example/RepartitionExample.java | 15 +-- .../samza/example/TestBasicStreamGraphs.java | 99 ++++++++++++++ .../samza/example/TestBroadcastExample.java | 113 ++++++++++++++++ .../apache/samza/example/TestExampleBase.java | 46 +++++++ .../samza/example/TestFluentStreamTasks.java | 99 -------------- .../apache/samza/example/TestJoinExample.java | 129 +++++++++++++++++++ .../apache/samza/example/TestWindowExample.java | 81 ++++++++++++ .../org/apache/samza/example/WindowGraph.java | 87 ------------- .../samza/operators/TestMessageStreamImpl.java | 19 ++- .../samza/operators/impl/TestOperatorImpls.java | 21 ++- 38 files changed, 687 insertions(+), 757 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 87a9fd3..adeb4c8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -42,7 +42,7 @@ import java.util.function.Function; public interface MessageStream<M> { /** - * Applies the provided 1:1 {@link Function} to messages in this {@link MessageStream} and returns the + * Applies the provided 1:1 function to messages in this {@link MessageStream} and returns the * transformed {@link MessageStream}. * * @param mapFn the function to transform a message to another message @@ -52,7 +52,7 @@ public interface MessageStream<M> { <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn); /** - * Applies the provided 1:n {@link Function} to transform a message in this {@link MessageStream} + * Applies the provided 1:n function to transform a message in this {@link MessageStream} * to n messages in the transformed {@link MessageStream} * * @param flatMapFn the function to transform a message to zero or more messages @@ -62,7 +62,7 @@ public interface MessageStream<M> { <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn); /** - * Applies the provided {@link Function} to messages in this {@link MessageStream} and returns the + * Applies the provided function to messages in this {@link MessageStream} and returns the * transformed {@link MessageStream}. * <p> * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream} @@ -92,16 +92,6 @@ public interface MessageStream<M> { void sendTo(OutputStream<M> stream); /** - * Allows sending messages to an intermediate {@link MessageStream}. - * - * NOTE: the {@code stream} has to be a {@link MessageStream}. - * - * @param stream the intermediate {@link MessageStream} to send the message to - * @return the intermediate {@link MessageStream} to consume the messages sent - */ - MessageStream<M> sendThrough(OutputStream<M> stream); - - /** * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics * (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of * {@link WindowPane}s. http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java index 9e6644b..abc9861 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -19,8 +19,6 @@ package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.serializers.Serde; @@ -93,26 +91,4 @@ public interface StreamGraph { */ StreamGraph withContextManager(ContextManager manager); - String GRAPH_CONFIG = "job.stream.graph.impl.class"; - String DEFAULT_GRAPH_IMPL_CLASS = "org.apache.samza.operators.StreamGraphImpl"; - - /** - * Static method to instantiate the implementation class of {@link StreamGraph}. - * - * @param config the {@link Config} object for this job - * @return the {@link StreamGraph} object created - */ - static StreamGraph fromConfig(Config config) { - - try { - if (StreamGraph.class.isAssignableFrom(Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)))) { - return (StreamGraph) Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)).newInstance(); - } - } catch (Exception e) { - throw new ConfigException(String.format("Problem in loading StreamGraphImpl class %s", config.get(GRAPH_CONFIG)), e); - } - throw new ConfigException(String.format( - "Class %s does not implement interface StreamGraphBuilder properly", - config.get(GRAPH_CONFIG))); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java new file mode 100644 index 0000000..b415cf8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; + + +/** + * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object. + */ +@InterfaceStability.Unstable +public interface StreamGraphBuilder { + /** + * Users are required to implement this abstract method to initialize the processing logic of the application, in terms + * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators + * + * @param graph an empty {@link StreamGraph} object to be initialized + * @param config the {@link Config} of the application + */ + void init(StreamGraph graph, Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java deleted file mode 100644 index c292363..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; - - -/** - * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object. - */ -@InterfaceStability.Unstable -public interface StreamGraphFactory { - /** - * Users are required to implement this abstract method to initialize the processing logic of the application, in terms - * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators - * - * @param config the {@link Config} of the application - * @return the {@link StreamGraph} object which contains user-defined processing logic of the application - */ - StreamGraph create(Config config); -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java index 73c5c9d..58479d6 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java @@ -19,8 +19,6 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; /** @@ -28,7 +26,8 @@ import org.apache.samza.task.TaskContext; * @param <M> type of the input message */ @InterfaceStability.Unstable -public interface FilterFunction<M> extends InitFunction { +@FunctionalInterface +public interface FilterFunction<M> extends InitableFunction { /** * Returns a boolean indicating whether this message should be retained or filtered out. @@ -37,11 +36,4 @@ public interface FilterFunction<M> extends InitFunction { */ boolean apply(M message); - /** - * Init method to initialize the context for this {@link FilterFunction}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java index f8458f2..bbbddeb 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java @@ -19,8 +19,6 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; import java.util.Collection; @@ -32,7 +30,8 @@ import java.util.Collection; * @param <OM> type of the transformed messages */ @InterfaceStability.Unstable -public interface FlatMapFunction<M, OM> extends InitFunction { +@FunctionalInterface +public interface FlatMapFunction<M, OM> extends InitableFunction { /** * Transforms the provided message into a collection of 0 or more messages. @@ -41,11 +40,4 @@ public interface FlatMapFunction<M, OM> extends InitFunction { */ Collection<OM> apply(M message); - /** - * Init method to initialize the context for this {@link FlatMapFunction}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { }; } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java deleted file mode 100644 index eec56df..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.functions; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; - - -/** - * interface defined to initalize the context of message transformation functions - */ -@InterfaceStability.Unstable -public interface InitFunction { - /** - * Interface method to initialize the context for a specific message transformation function. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - void init(Config config, TaskContext context); -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java new file mode 100644 index 0000000..2f738da --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.task.TaskContext; + + +/** + * interface defined to initalize the context of message transformation functions + */ +@InterfaceStability.Unstable +public interface InitableFunction { + + /** + * Interface method to initialize the context for a specific message transformation function. + * + * @param config the {@link Config} object for this task + * @param context the {@link TaskContext} object for this task + */ + default void init(Config config, TaskContext context) { } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java index afc92ee..fc38177 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java @@ -19,8 +19,6 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; /** @@ -32,7 +30,7 @@ import org.apache.samza.task.TaskContext; * @param <RM> type of the joined message */ @InterfaceStability.Unstable -public interface JoinFunction<K, M, JM, RM> extends InitFunction { +public interface JoinFunction<K, M, JM, RM> extends InitableFunction { /** * Join the provided input messages and produces the joined messages. @@ -58,11 +56,4 @@ public interface JoinFunction<K, M, JM, RM> extends InitFunction { */ K getSecondKey(JM message); - /** - * Init method to initialize the context for this {@link JoinFunction}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java deleted file mode 100644 index b651b3d..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.functions; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; - -/** - * A specific {@link JoinFunction} that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces - * a joined message. - * - * @param <K> type of the join key - * @param <M> type of the input {@link MessageEnvelope} - * @param <JM> type of the {@link MessageEnvelope} to join with - * @param <RM> type of the joined message - */ -@InterfaceStability.Unstable -@FunctionalInterface -public interface KeyValueJoinFunction<K, M extends MessageEnvelope<K, ?>, JM extends MessageEnvelope<K, ?>, RM> extends JoinFunction<K, M, JM, RM> { - - default K getFirstKey(M message) { - return message.getKey(); - } - - default K getSecondKey(JM message) { - return message.getKey(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java index a051914..05a554f 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -19,8 +19,6 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; /** @@ -29,7 +27,8 @@ import org.apache.samza.task.TaskContext; * @param <OM> type of the transformed message */ @InterfaceStability.Unstable -public interface MapFunction<M, OM> extends InitFunction { +@FunctionalInterface +public interface MapFunction<M, OM> extends InitableFunction { /** * Transforms the provided message into another message @@ -38,11 +37,4 @@ public interface MapFunction<M, OM> extends InitFunction { */ OM apply(M message); - /** - * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java index 1050771..08e090a 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java @@ -19,9 +19,7 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -30,7 +28,8 @@ import org.apache.samza.task.TaskCoordinator; * @param <M> type of the input message */ @InterfaceStability.Unstable -public interface SinkFunction<M> extends InitFunction { +@FunctionalInterface +public interface SinkFunction<M> extends InitableFunction { /** * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using @@ -38,16 +37,9 @@ public interface SinkFunction<M> extends InitFunction { * or shut the container down. * * @param message the input message to be sent to an output {@link org.apache.samza.system.SystemStream} - * @param messageCollector the {@link MessageCollector} to use to send the {@link org.apache.samza.operators.data.MessageEnvelope} + * @param messageCollector the {@link MessageCollector} to send the {@link org.apache.samza.operators.data.MessageEnvelope} * @param taskCoordinator the {@link TaskCoordinator} to request commits or shutdown */ void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator); - /** - * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java index 8aa665a..9609292 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java @@ -29,7 +29,7 @@ import org.apache.samza.operators.triggers.Trigger; * that determine when results from the {@link Window} are emitted. * * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}. - * A pane can include all messagess collected for the window so far or only the new messages + * A pane can include all messages collected for the window so far or only the new messages * since the last emitted pane. (as determined by the {@link AccumulationMode}) * * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java index d0c5985..ad37eb3 100644 --- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java +++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java @@ -20,7 +20,7 @@ package org.apache.samza.system; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.ConfigException; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; @@ -65,9 +65,9 @@ public interface ExecutionEnvironment { /** * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph} * - * @param graphFactory the user-defined {@link StreamGraphFactory} object + * @param graphBuilder the user-defined {@link StreamGraphBuilder} object * @param config the {@link Config} object for this job */ - void run(StreamGraphFactory graphFactory, Config config); + void run(StreamGraphBuilder graphBuilder, Config config); } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 5779071..128cff1 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -61,7 +61,7 @@ public interface TaskContext { * @param <T> the type of user-defined task context * @return user-defined task context object */ - default <T extends TaskContext> T getUserDefinedContext() { + default <T> T getUserDefinedContext() { return null; }; } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java index 809c5b4..54d0b2f 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java @@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals; public class TestWindowPane { @Test public void testConstructor() { - WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10); + WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10); assertEquals(wndOutput.getKey().getKey(), "testMsg"); assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index d85d488..830e4a5 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -93,11 +93,6 @@ public class MessageStreamImpl<M> implements MessageStream<M> { this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream)); } - @Override public MessageStream<M> sendThrough(OutputStream<M> stream) { - this.sendTo(stream); - return this.graph.getIntStream(stream); - } - @Override public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java index 3583b92..809a70a 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java @@ -19,8 +19,6 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.task.TaskContext; /** @@ -28,7 +26,7 @@ import org.apache.samza.task.TaskContext; * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output */ @InterfaceStability.Unstable -public interface PartialJoinFunction<K, M, OM, RM> extends InitFunction { +public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction { /** * Method to perform join method on the two input messages @@ -55,11 +53,4 @@ public interface PartialJoinFunction<K, M, OM, RM> extends InitFunction { */ K getOtherKey(OM message); - /** - * Init method to initialize the context for this {@link PartialJoinFunction}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java new file mode 100644 index 0000000..fafa2cb --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; + +/** + * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment + */ +public class RemoteExecutionEnvironment implements ExecutionEnvironment { + + @Override public void run(StreamGraphBuilder app, Config config) { + // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} + // TODO: actually instantiate the tasks and run the job, i.e. + // 1. create all input/output/intermediate topics + // 2. create the single job configuration + // 3. execute JobRunner to submit the single job for the whole graph + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java deleted file mode 100644 index 60a4c60..0000000 --- a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import org.apache.samza.operators.StreamGraphFactory; -import org.apache.samza.config.Config; - -/** - * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment - */ -public class SingleJobExecutionEnvironment implements ExecutionEnvironment { - - @Override public void run(StreamGraphFactory app, Config config) { - // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} - // TODO: actually instantiate the tasks and run the job, i.e. - // 1. create all input/output/intermediate topics - // 2. create the single job configuration - // 3. execute JobRunner to submit the single job for the whole graph - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java index f60ff82..f0f6ef2 100644 --- a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java +++ b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java @@ -19,8 +19,10 @@ package org.apache.samza.system; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraphImpl; /** @@ -28,14 +30,21 @@ import org.apache.samza.config.Config; */ public class StandaloneExecutionEnvironment implements ExecutionEnvironment { - @Override public void run(StreamGraphFactory app, Config config) { + // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} + StreamGraph createGraph(StreamGraphBuilder app, Config config) { + StreamGraphImpl graph = new StreamGraphImpl(); + app.init(graph, config); + return graph; + } + + @Override public void run(StreamGraphBuilder app, Config config) { // 1. get logic graph for optimization - // StreamGraph logicGraph = app.create(config); + // StreamGraph logicGraph = this.createGraph(app, config); // 2. potential optimization.... - // 3. create new instance of StreamGraphFactory that would generate the optimized graph + // 3. create new instance of StreamGraphBuilder that would generate the optimized graph // 4. create all input/output/intermediate topics // 5. create the configuration for StreamProcessor - // 6. start the StreamProcessor w/ optimized instance of StreamGraphFactory + // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java index fa7ec5e..b007e3c 100644 --- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -21,7 +21,7 @@ package org.apache.samza.task; import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.impl.OperatorGraph; @@ -43,9 +43,9 @@ import java.util.Map; * This class brings all the operator API implementation components together and feeds the * {@link InputMessageEnvelope}s into the transformation chains. * <p> - * It accepts an instance of the user implemented factory {@link StreamGraphFactory} as input parameter of the constructor. + * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor. * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl} - * from the {@link StreamGraphFactory}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context + * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl} * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}. @@ -67,27 +67,30 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo */ private final OperatorGraph operatorGraph = new OperatorGraph(); - private final StreamGraphFactory graphFactory; + private final StreamGraphBuilder graphBuilder; - private ContextManager taskManager; + private ContextManager contextManager; - public StreamOperatorTask(StreamGraphFactory graphFactory) { - this.graphFactory = graphFactory; + public StreamOperatorTask(StreamGraphBuilder graphBuilder) { + this.graphBuilder = graphBuilder; } @Override public final void init(Config config, TaskContext context) throws Exception { // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task - StreamGraphImpl streams = (StreamGraphImpl) this.graphFactory.create(config); - this.taskManager = streams.getContextManager(); + StreamGraphImpl streams = new StreamGraphImpl(); + this.graphBuilder.init(streams, config); + // get the context manager of the {@link StreamGraph} and initialize the task-specific context + this.contextManager = streams.getContextManager(); Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>(); context.getSystemStreamPartitions().forEach(ssp -> { if (!inputBySystemStream.containsKey(ssp.getSystemStream())) { + // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream} inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream())); } }); - operatorGraph.init(inputBySystemStream, config, this.taskManager.initTaskContext(config, context)); + operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context)); } @Override @@ -103,6 +106,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo @Override public void close() throws Exception { - this.taskManager.finalizeTaskContext(); + this.contextManager.finalizeTaskContext(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java deleted file mode 100644 index a91ce09..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamGraphFactory; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.function.BiFunction; -import java.util.Properties; -import java.util.Set; - - -/** - * Example implementation of split stream tasks - * - */ -public class BroadcastGraph implements StreamGraphFactory { - - private final Set<SystemStreamPartition> inputs; - - BroadcastGraph(Set<SystemStreamPartition> inputs) { - this.inputs = inputs; - } - - class MessageType { - String field1; - String field2; - String field3; - String field4; - String parKey; - private long timestamp; - - public long getTimestamp() { - return this.timestamp; - } - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public StreamGraph create(Config config) { - StreamGraphImpl graph = new StreamGraphImpl(); - - BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; - inputs.forEach(entry -> { - MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return entry.getSystemStream(); - } - - @Override public Properties getProperties() { - return null; - } - }, null, null). - map(this::getInputMessage); - - inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - }); - return graph; - } - - JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) { - return (JsonMessageEnvelope) m1.getMessage(); - } - - boolean myFilter1(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key1"); - } - - boolean myFilter2(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key2"); - } - - boolean myFilter3(JsonMessageEnvelope m1) { - return m1.getMessage().parKey.equals("key3"); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java deleted file mode 100644 index 2313f63..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamGraphFactory; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.functions.KeyValueJoinFunction; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.Set; - - -/** - * Example implementation of unique key-based stream-stream join tasks - * - */ -public class JoinGraph implements StreamGraphFactory { - private final Set<SystemStreamPartition> inputs; - - JoinGraph(Set<SystemStreamPartition> inputs) { - this.inputs = inputs; - } - - class MessageType { - String joinKey; - List<String> joinFields = new ArrayList<>(); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - MessageStream<JsonMessageEnvelope> joinOutput = null; - - @Override - public StreamGraph create(Config config) { - StreamGraphImpl graph = new StreamGraphImpl(); - - for (SystemStreamPartition input : inputs) { - MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( - new StreamSpec() { - @Override public SystemStream getSystemStream() { - return input.getSystemStream(); - } - - @Override public Properties getProperties() { - return null; - } - }, null, null).map(this::getInputMessage); - if (joinOutput == null) { - joinOutput = newSource; - } else { - joinOutput = joinOutput.join(newSource, - (KeyValueJoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope>) this::myJoinResult); - } - } - - joinOutput.sendTo(graph.createOutStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return null; - } - - @Override public Properties getProperties() { - return null; - } - }, new StringSerde("UTF-8"), new JsonSerde<>())); - - return graph; - } - - private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { - return new JsonMessageEnvelope( - ((MessageType) ism.getMessage()).joinKey, - (MessageType) ism.getMessage(), - ism.getOffset(), - ism.getSystemStreamPartition()); - } - - JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { - MessageType newJoinMsg = new MessageType(); - newJoinMsg.joinKey = m1.getKey(); - newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); - newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); - return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java index ad6336a..85ebc6c 100644 --- a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java @@ -22,12 +22,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphFactory; + +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.serializers.JsonSerde; @@ -44,7 +42,7 @@ import java.util.Properties; /** * Example code using {@link KeyValueStore} to implement event-time window */ -public class KeyValueStoreExample implements StreamGraphFactory { +public class KeyValueStoreExample implements StreamGraphBuilder { /** * used by remote execution environment to launch the job in remote program. The remote program should follow the similar @@ -59,8 +57,7 @@ public class KeyValueStoreExample implements StreamGraphFactory { * } * */ - @Override public StreamGraph create(Config config) { - StreamGraph graph = StreamGraph.fromConfig(config); + @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>()); @@ -70,7 +67,6 @@ public class KeyValueStoreExample implements StreamGraphFactory { flatMap(new MyStatsCounter()). sendTo(pageViewPerMemberCounters); - return graph; } // standalone local program model http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java index 577d06f..c6d2e6e 100644 --- a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java +++ b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java @@ -18,12 +18,9 @@ */ package org.apache.samza.example; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.Offset; @@ -41,9 +38,9 @@ import java.util.Properties; /** - * Example {@link StreamGraphFactory} code to test the API methods + * Example {@link StreamGraphBuilder} code to test the API methods */ -public class NoContextStreamExample implements StreamGraphFactory { +public class NoContextStreamExample implements StreamGraphBuilder { StreamSpec input1 = new StreamSpec() { @Override public SystemStream getSystemStream() { @@ -129,8 +126,7 @@ public class NoContextStreamExample implements StreamGraphFactory { * } * */ - @Override public StreamGraph create(Config config) { - StreamGraph graph = StreamGraph.fromConfig(config); + @Override public void init(StreamGraph graph, Config config) { MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream( input1, null, null); MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream( @@ -142,7 +138,6 @@ public class NoContextStreamExample implements StreamGraphFactory { join(inputSource2.map(this::getInputMessage), new MyJoinFunction()). sendTo(outStream); - return graph; } // standalone local program model http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java index ad433b6..0477066 100644 --- a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -20,7 +20,7 @@ package org.apache.samza.example; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.StreamSpec; @@ -38,7 +38,7 @@ import java.util.Properties; /** * Simple 2-way stream-to-stream join example */ -public class OrderShipmentJoinExample implements StreamGraphFactory { +public class OrderShipmentJoinExample implements StreamGraphBuilder { /** * used by remote execution environment to launch the job in remote program. The remote program should follow the similar @@ -53,8 +53,7 @@ public class OrderShipmentJoinExample implements StreamGraphFactory { * } * */ - @Override public StreamGraph create(Config config) { - StreamGraph graph = StreamGraph.fromConfig(config); + @Override public void init(StreamGraph graph, Config config) { MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>()); @@ -62,7 +61,6 @@ public class OrderShipmentJoinExample implements StreamGraphFactory { orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders); - return graph; } // standalone local program model http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java index 1502aa2..f7d8bda 100644 --- a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -18,12 +18,9 @@ */ package org.apache.samza.example; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.AccumulationMode; @@ -42,10 +39,9 @@ import java.util.Properties; /** * Example code to implement window-based counter */ -public class PageViewCounterExample implements StreamGraphFactory { +public class PageViewCounterExample implements StreamGraphBuilder { - @Override public StreamGraph create(Config config) { - StreamGraph graph = StreamGraph.fromConfig(config); + @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); @@ -56,7 +52,7 @@ public class PageViewCounterExample implements StreamGraphFactory { setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); - return graph; + } public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java index f15e514..6994ac4 100644 --- a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -18,12 +18,9 @@ */ package org.apache.samza.example; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; @@ -38,9 +35,9 @@ import java.util.*; /** - * Example {@link StreamGraphFactory} code to test the API methods with re-partition operator + * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator */ -public class RepartitionExample implements StreamGraphFactory { +public class RepartitionExample implements StreamGraphBuilder { /** * used by remote execution environment to launch the job in remote program. The remote program should follow the similar @@ -54,8 +51,7 @@ public class RepartitionExample implements StreamGraphFactory { * } * */ - @Override public StreamGraph create(Config config) { - StreamGraph graph = StreamGraph.fromConfig(config); + @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); @@ -67,7 +63,6 @@ public class RepartitionExample implements StreamGraphFactory { map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); - return graph; } // standalone local program model http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java new file mode 100644 index 0000000..8ecd44f --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.lang.reflect.Field; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.OperatorGraph; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * Unit test for {@link StreamOperatorTask} + */ +public class TestBasicStreamGraphs { + + private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { + for (int i = 0; i < 4; i++) { + this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i))); + } + } }; + + @Test + public void testUserTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + TestWindowExample userTask = new TestWindowExample(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + + @Test + public void testSplitTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + + @Test + public void testJoinTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + TestJoinExample joinTask = new TestJoinExample(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java new file mode 100644 index 0000000..d22324b --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of split stream tasks + * + */ +public class TestBroadcastExample extends TestExampleBase { + + TestBroadcastExample(Set<SystemStreamPartition> inputs) { + super(inputs); + } + + class MessageType { + String field1; + String field2; + String field3; + String field4; + String parKey; + private long timestamp; + + public long getTimestamp() { + return this.timestamp; + } + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public void init(StreamGraph graph, Config config) { + BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; + inputs.keySet().forEach(entry -> { + MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return entry; + } + + @Override public Properties getProperties() { + return null; + } + }, null, null).map(this::getInputMessage); + + inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + }); + } + + JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) { + return (JsonMessageEnvelope) m1.getMessage(); + } + + boolean myFilter1(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key1"); + } + + boolean myFilter2(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key2"); + } + + boolean myFilter3(JsonMessageEnvelope m1) { + return m1.getMessage().parKey.equals("key3"); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java new file mode 100644 index 0000000..c4df9d4 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Base class for test examples + * + */ +public abstract class TestExampleBase implements StreamGraphBuilder { + + protected final Map<SystemStream, Set<SystemStreamPartition>> inputs; + + TestExampleBase(Set<SystemStreamPartition> inputs) { + this.inputs = new HashMap<>(); + for (SystemStreamPartition input : inputs) { + this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>()); + this.inputs.get(input.getSystemStream()).add(input); + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java deleted file mode 100644 index 5f659ba..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import java.lang.reflect.Field; -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.operators.impl.OperatorGraph; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.task.TaskContext; -import org.junit.Test; - -import java.util.HashSet; -import java.util.Set; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - - -/** - * Unit test for {@link StreamOperatorTask} - */ -public class TestFluentStreamTasks { - - private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { - for (int i = 0; i < 4; i++) { - this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i))); - } - } }; - - @Test - public void testUserTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - WindowGraph userTask = new WindowGraph(this.inputPartitions); - StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask); - Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); - pipelineMapFld.setAccessible(true); - OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - this.inputPartitions.forEach(partition -> { - assertNotNull(opGraph.get(partition.getSystemStream())); - }); - } - - @Test - public void testSplitTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - BroadcastGraph splitTask = new BroadcastGraph(this.inputPartitions); - StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask); - Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); - pipelineMapFld.setAccessible(true); - OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - this.inputPartitions.forEach(partition -> { - assertNotNull(opGraph.get(partition.getSystemStream())); - }); - } - - @Test - public void testJoinTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - JoinGraph joinTask = new JoinGraph(this.inputPartitions); - StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask); - Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); - pipelineMapFld.setAccessible(true); - OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - this.inputPartitions.forEach(partition -> { - assertNotNull(opGraph.get(partition.getSystemStream())); - }); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java new file mode 100644 index 0000000..fe6e7e7 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of unique key-based stream-stream join tasks + * + */ +public class TestJoinExample extends TestExampleBase { + + TestJoinExample(Set<SystemStreamPartition> inputs) { + super(inputs); + } + + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + MessageStream<JsonMessageEnvelope> joinOutput = null; + + @Override + public void init(StreamGraph graph, Config config) { + + for (SystemStream input : inputs.keySet()) { + MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( + new StreamSpec() { + @Override public SystemStream getSystemStream() { + return input; + } + + @Override public Properties getProperties() { + return null; + } + }, null, null).map(this::getInputMessage); + if (joinOutput == null) { + joinOutput = newSource; + } else { + joinOutput = joinOutput.join(newSource, new MyJoinFunction()); + } + } + + joinOutput.sendTo(graph.createOutStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return null; + } + + @Override public Properties getProperties() { + return null; + } + }, new StringSerde("UTF-8"), new JsonSerde<>())); + + } + + private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { + return new JsonMessageEnvelope( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getSystemStreamPartition()); + } + + class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> { + JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); + } + + @Override + public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) { + return this.myJoinResult(message, otherMessage); + } + + @Override + public String getFirstKey(JsonMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(JsonMessageEnvelope message) { + return message.getKey(); + } + } +}