SAMZA-1054: Refactor Operator APIs Some suggestions for an Operator API refactor and misc. cleanup. It does contain some implementation changes, mostly due to deleted, extracted or merged classes. (e.g. OperatorFactory + ChainedOperators == OperatorImpls).
Since git marked several moved classes as (delete + new) instead, it's probably best to apply the diff locally and browse the code in an IDE. Some of the changes, in no particular order: * Extracted XFunction interfaces into a .functions package in -api. * -api's internal.Operators is now the -operators's spec.* package. Extracted interfaces and classes. Factory methods are now in OperatorSpecs. * -api's MessageStreams is now -api's MessageStream interface and -operators's MessageStreamImpl. * -api's internal.Windows classes are now in -api's .window package. Extracted interfaces and classes, but no implementation changes. * OperatorFactory + ChainedOperators is now OperatorImpls, which is used from StreamOperatorAdaptorTask. * Added a NoOpOperatorImpl, which acts as the root node for the OperatorImpl DAG returned by OperatorImpls. * Removed usages of reactivestreams APIs since current code looks simpler without them. We can add them back when we need features like backpressure etc. * Removed the InputSystemMessage interface. * Made field names consistent (e.g Fn suffix for functions everywhere etc.). * Some method/class visibility changes due to moved classes. * General documentation changes, mostly to make public APIs clearer. There are additional questions/tasks that we can address in future RBs: * Updating Window and Trigger APIs. * Merging samza-operator into samza-core. * Questions about Message timestamp and Offset comparison semantics. * Questions about OperatorSpec serialization (e.g. ID generation). * Questions about StateStoreImpl and StoreFunctions. Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Yi Pan <nickpa...@gmail.com>, Jagadish <jagadish1...@gmail.com> Closes #25 from prateekm/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/00543804 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/00543804 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/00543804 Branch: refs/heads/master Commit: 00543804b3c32f1cbea0212e4a94e360b5a324cc Parents: a980c96 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Thu Dec 1 14:50:52 2016 -0800 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Thu Dec 1 14:50:52 2016 -0800 ---------------------------------------------------------------------- .../apache/samza/operators/MessageStream.java | 197 +++----- .../apache/samza/operators/MessageStreams.java | 81 ---- .../samza/operators/StreamOperatorTask.java | 51 ++ .../apache/samza/operators/TriggerBuilder.java | 323 ------------- .../org/apache/samza/operators/WindowState.java | 81 ---- .../org/apache/samza/operators/Windows.java | 203 -------- .../operators/data/IncomingSystemMessage.java | 76 --- .../data/IncomingSystemMessageEnvelope.java | 63 +++ .../operators/data/InputSystemMessage.java | 45 -- .../apache/samza/operators/data/Message.java | 64 --- .../samza/operators/data/MessageEnvelope.java | 54 +++ .../operators/functions/FilterFunction.java | 40 ++ .../operators/functions/FlatMapFunction.java | 44 ++ .../samza/operators/functions/JoinFunction.java | 44 ++ .../samza/operators/functions/MapFunction.java | 41 ++ .../samza/operators/functions/SinkFunction.java | 46 ++ .../samza/operators/internal/Operators.java | 469 ------------------- .../samza/operators/internal/Trigger.java | 95 ---- .../samza/operators/internal/WindowFn.java | 60 --- .../samza/operators/internal/WindowOutput.java | 55 --- .../operators/task/StreamOperatorTask.java | 45 -- .../samza/operators/windows/SessionWindow.java | 102 ++++ .../samza/operators/windows/StoreFunctions.java | 67 +++ .../apache/samza/operators/windows/Trigger.java | 94 ++++ .../samza/operators/windows/TriggerBuilder.java | 320 +++++++++++++ .../apache/samza/operators/windows/Window.java | 49 ++ .../samza/operators/windows/WindowFn.java | 59 +++ .../samza/operators/windows/WindowOutput.java | 51 ++ .../samza/operators/windows/WindowState.java | 85 ++++ .../apache/samza/operators/windows/Windows.java | 100 ++++ .../org/apache/samza/operators/TestMessage.java | 47 -- .../samza/operators/TestMessageEnvelope.java | 61 +++ .../samza/operators/TestMessageStream.java | 180 ------- .../samza/operators/TestMessageStreams.java | 35 -- .../samza/operators/TestOutputMessage.java | 47 -- .../operators/TestOutputMessageEnvelope.java | 43 ++ .../samza/operators/TestTriggerBuilder.java | 214 --------- .../org/apache/samza/operators/TestWindows.java | 106 ----- .../data/TestIncomingSystemMessage.java | 5 +- .../samza/operators/data/TestLongOffset.java | 9 +- .../samza/operators/internal/TestOperators.java | 128 ----- .../samza/operators/internal/TestTrigger.java | 68 --- .../operators/internal/TestWindowOutput.java | 36 -- .../samza/operators/windows/TestTrigger.java | 68 +++ .../operators/windows/TestTriggerBuilder.java | 226 +++++++++ .../operators/windows/TestWindowOutput.java | 36 ++ .../samza/operators/windows/TestWindows.java | 109 +++++ .../samza/operators/MessageStreamImpl.java | 134 ++++++ .../apache/samza/operators/StateStoreImpl.java | 56 +++ .../operators/StreamOperatorAdaptorTask.java | 105 +++++ .../samza/operators/impl/ChainedOperators.java | 119 ----- .../samza/operators/impl/OperatorFactory.java | 85 ---- .../samza/operators/impl/OperatorImpl.java | 76 +-- .../samza/operators/impl/OperatorImpls.java | 125 +++++ .../operators/impl/PartialJoinOperatorImpl.java | 46 ++ .../samza/operators/impl/ProcessorContext.java | 53 --- .../samza/operators/impl/RootOperatorImpl.java | 36 ++ .../impl/SessionWindowOperatorImpl.java | 67 +++ .../operators/impl/SimpleOperatorImpl.java | 49 -- .../samza/operators/impl/SinkOperatorImpl.java | 22 +- .../samza/operators/impl/StateStoreImpl.java | 56 --- .../operators/impl/StreamOperatorImpl.java | 47 ++ .../operators/impl/join/PartialJoinOpImpl.java | 44 -- .../impl/window/SessionWindowImpl.java | 65 --- .../samza/operators/spec/OperatorSpec.java | 37 ++ .../samza/operators/spec/OperatorSpecs.java | 116 +++++ .../operators/spec/PartialJoinOperatorSpec.java | 104 ++++ .../samza/operators/spec/SinkOperatorSpec.java | 62 +++ .../operators/spec/StreamOperatorSpec.java | 67 +++ .../operators/spec/WindowOperatorSpec.java | 119 +++++ .../samza/task/StreamOperatorAdaptorTask.java | 85 ---- .../apache/samza/operators/BroadcastTask.java | 101 ++++ .../org/apache/samza/operators/JoinTask.java | 77 +++ .../operators/TestFluentStreamAdaptorTask.java | 85 ++++ .../samza/operators/TestFluentStreamTasks.java | 112 +++++ .../samza/operators/TestMessageStreamImpl.java | 203 ++++++++ .../samza/operators/TestStateStoreImpl.java | 72 +++ .../org/apache/samza/operators/WindowTask.java | 70 +++ .../data/JsonIncomingSystemMessageEnvelope.java | 60 +++ .../operators/impl/TestChainedOperators.java | 129 ----- .../operators/impl/TestOperatorFactory.java | 93 ---- .../samza/operators/impl/TestOperatorImpl.java | 48 +- .../samza/operators/impl/TestOperatorImpls.java | 183 ++++++++ .../operators/impl/TestProcessorContext.java | 40 -- .../operators/impl/TestSessionWindowImpl.java | 111 +++++ .../operators/impl/TestSimpleOperatorImpl.java | 55 --- .../operators/impl/TestSinkOperatorImpl.java | 25 +- .../operators/impl/TestStateStoreImpl.java | 69 --- .../operators/impl/TestStreamOperatorImpl.java | 60 +++ .../impl/window/TestSessionWindowImpl.java | 105 ----- .../samza/operators/spec/TestOperatorSpecs.java | 114 +++++ .../samza/task/BroadcastOperatorTask.java | 102 ---- .../samza/task/InputJsonSystemMessage.java | 67 --- .../org/apache/samza/task/JoinOperatorTask.java | 80 ---- .../task/TestStreamOperatorAdaptorTask.java | 80 ---- .../samza/task/TestStreamOperatorTasks.java | 105 ----- .../apache/samza/task/WindowOperatorTask.java | 71 --- 97 files changed, 4374 insertions(+), 4240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index dede631..d18536b 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -16,173 +16,112 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators; -import org.apache.samza.operators.internal.Operators.Operator; -import org.apache.samza.operators.internal.WindowOutput; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.windows.Window; +import org.apache.samza.operators.windows.WindowOutput; +import org.apache.samza.operators.windows.WindowState; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; /** - * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to - * directly program the stream processing stages that processes a stream and generate another one. + * Represents a stream of {@link MessageEnvelope}s. + * <p> + * A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API. * - * @param <M> Type of message in this stream + * @param <M> type of {@link MessageEnvelope}s in this stream */ @InterfaceStability.Unstable -public class MessageStream<M extends Message> { - - private final Set<Operator> subscribers = new HashSet<>(); - - /** - * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}. - * - * NOTE: This is purely an internal API and should not be used directly by programmers. - * - * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object - */ - public Collection<Operator> getSubscribers() { - return Collections.unmodifiableSet(this.subscribers); - } - - /** - * Public API methods start here - */ +public interface MessageStream<M extends MessageEnvelope> { /** - * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value + * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the + * transformed {@link MessageStream}. * - * @param <A> the type of input {@code a} - * @param <B> the type of input {@code b} - * @param <C> the type of input {@code c} + * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope} + * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} + * @return the transformed {@link MessageStream} */ - @FunctionalInterface - public interface VoidFunction3<A, B, C> { - public void apply(A a, B b, C c); - } + <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn); /** - * Method to apply a map function (1:1) on a {@link MessageStream} + * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream} + * to n {@link MessageEnvelope}s in the transformed {@link MessageStream} * - * @param mapper the mapper function to map one input {@link Message} to one output {@link Message} - * @param <OM> the type of the output {@link Message} in the output {@link MessageStream} - * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} + * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s + * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} + * @return the transformed {@link MessageStream} */ - public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) { - Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() { { - OM r = mapper.apply(m); - if (r != null) { - this.add(r); - } - } }); - this.subscribers.add(op); - return op.getOutputStream(); - } + <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn); /** - * Method to apply a flatMap function (1:n) on a {@link MessageStream} + * Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the + * transformed {@link MessageStream}. + * <p> + * The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream} + * should be retained in the transformed {@link MessageStream}. * - * @param flatMapper the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s - * @param <OM> the type of the output {@link Message} in the output {@link MessageStream} - * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} + * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream} + * @return the transformed {@link MessageStream} */ - public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) { - Operator<OM> op = Operators.getStreamOperator(flatMapper); - this.subscribers.add(op); - return op.getOutputStream(); - } + MessageStream<M> filter(FilterFunction<M> filterFn); /** - * Method to apply a filter function on a {@link MessageStream} + * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output + * {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}. * - * @param filter the filter function to filter input {@link Message}s from the input {@link MessageStream} - * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream} + * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems */ - public MessageStream<M> filter(Function<M, Boolean> filter) { - Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() { { - if (filter.apply(t)) { - this.add(t); - } - } }); - this.subscribers.add(op); - return op.getOutputStream(); - } + void sink(SinkFunction<M> sinkFn); /** - * Method to send an input {@link MessageStream} to an output {@link org.apache.samza.system.SystemStream}, and allows the output {@link MessageStream} - * to be consumed by downstream stream operators again. + * Groups the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window} semantics + * (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of + * {@link WindowOutput}s. + * <p> + * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows. * - * @param sink the user-defined sink function to send the input {@link Message}s to the external output systems + * @param window the {@link Window} to group and process {@link MessageEnvelope}s from this {@link MessageStream} + * @param <WK> the type of key in the {@link WindowOutput} from the {@link Window} + * @param <WV> the type of value in the {@link WindowOutput} from the {@link Window} + * @param <WS> the type of window state kept in the {@link Window} + * @param <WM> the type of {@link WindowOutput} in the transformed {@link MessageStream} + * @return the transformed {@link MessageStream} */ - public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { - this.subscribers.add(Operators.getSinkOperator(sink)); - } + <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window( + Window<M, WK, WV, WM> window); /** - * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream} + * Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}. + * <p> + * We currently only support 2-way joins. * - * @param window the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream} - * @param <WK> the type of key in the output {@link Message} from the {@link Windows.Window} function - * @param <WV> the type of output value from - * @param <WS> the type of window state kept in the {@link Windows.Window} function - * @param <WM> the type of {@link org.apache.samza.operators.internal.WindowOutput} message from the {@link Windows.Window} function - * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream} - */ - public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Windows.Window<M, WK, WV, WM> window) { - Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window)); - this.subscribers.add(wndOp); - return wndOp.getOutputStream(); - } - - /** - * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins. - * - * @param other the other stream to be joined w/ - * @param merger the common function to merge messages from this {@link MessageStream} and {@code other} + * @param otherStream the other {@link MessageStream} to be joined with + * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream} * @param <K> the type of join key - * @param <JM> the type of message in the {@link Message} from the other join stream - * @param <RM> the type of message in the {@link Message} from the join function - * @return the output {@link MessageStream} from the join function {@code joiner} + * @param <OM> the type of {@link MessageEnvelope}s in the other stream + * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn} + * @return the joined {@link MessageStream} */ - public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other, - BiFunction<M, JM, RM> merger) { - MessageStream<RM> outputStream = new MessageStream<>(); - - BiFunction<M, JM, RM> parJoin1 = merger::apply; - BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m); - - // TODO: need to add default store functions for the two partial join functions - - other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream)); - this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream)); - return outputStream; - } + <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream, + JoinFunction<M, OM, RM> joinFn); /** - * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M} + * Merge all {@code otherStreams} with this {@link MessageStream}. + * <p> + * The merging streams must have the same {@link MessageEnvelope} type {@code M}. * - * @param others other streams to be merged w/ this one - * @return the merged output stream + * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream} + * @return the merged {@link MessageStream} */ - public MessageStream<M> merge(Collection<MessageStream<M>> others) { - MessageStream<M> outputStream = new MessageStream<>(); - - others.add(this); - others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream))); - return outputStream; - } - + MessageStream<M> merge(Collection<MessageStream<M>> otherStreams); + } http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java deleted file mode 100644 index 51bf482..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.IncomingSystemMessage; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream} - * from a specific input source. - * - */ -@InterfaceStability.Unstable -public final class MessageStreams { - - /** - * private constructor to prevent instantiation - */ - private MessageStreams() {} - - /** - * private class for system input/output {@link MessageStream} - */ - public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> { - /** - * The corresponding {@link org.apache.samza.system.SystemStream} - */ - private final SystemStreamPartition ssp; - - /** - * Constructor for input system stream - * - * @param ssp the input {@link SystemStreamPartition} for the input {@link SystemMessageStream} - */ - private SystemMessageStream(SystemStreamPartition ssp) { - this.ssp = ssp; - } - - /** - * Getter for the {@link SystemStreamPartition} of the input - * - * @return the input {@link SystemStreamPartition} - */ - public SystemStreamPartition getSystemStreamPartition() { - return this.ssp; - } - } - - /** - * Public static API methods start here - */ - - /** - * Static API method to create a {@link MessageStream} from a system input stream - * - * @param ssp the input {@link SystemStreamPartition} - * @return the {@link MessageStream} object takes {@code ssp} as the input - */ - public static SystemMessageStream input(SystemStreamPartition ssp) { - return new SystemMessageStream(ssp); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java new file mode 100644 index 0000000..16cf27a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Map; + + +/** + * A {@link StreamOperatorTask} is the basic interface to implement for processing {@link MessageStream}s. + * Implementations can describe the transformation steps for each {@link MessageStream} in the + * {@link #transform} method using {@link MessageStream} APIs. + * <p> + * Implementations may be augmented by implementing {@link org.apache.samza.task.InitableTask}, + * {@link org.apache.samza.task.WindowableTask} and {@link org.apache.samza.task.ClosableTask} interfaces, + * but should not implement {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask} + * interfaces. + */ +@InterfaceStability.Unstable +public interface StreamOperatorTask { + + /** + * Describe the transformation steps for each {@link MessageStream}s for this task using the + * {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one {@link SystemStreamPartition} + * in the input system. + * + * @param messageStreams the {@link MessageStream}s that receive {@link IncomingSystemMessageEnvelope}s + * from their corresponding {@link org.apache.samza.system.SystemStreamPartition} + */ + void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java deleted file mode 100644 index 5b3f4d0..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Trigger; - -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines a builder of {@link org.apache.samza.operators.internal.Trigger} object for a {@link Windows.Window}. The triggers are categorized into - * three types: - * - * <p> - * early trigger: defines the condition when the first output from the window function is sent. - * late trigger: defines the condition when the updated output after the first output is sent. - * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers - * </p> - * - * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR). - * - * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.internal} to create triggers - * - * - * @param <M> the type of input {@link Message} to the {@link Windows.Window} - * @param <V> the type of output value from the {@link Windows.Window} - */ -@InterfaceStability.Unstable -public final class TriggerBuilder<M extends Message, V> { - - /** - * Predicate helper to OR multiple trigger conditions - */ - static class PredicateHelper { - static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) { - return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s); - } - - static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) { - return s -> lhs.apply(s) || rhs.apply(s); - } - } - - /** - * The early trigger condition that determines the first output from the {@link Windows.Window} - */ - private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null; - - /** - * The late trigger condition that determines the late output(s) from the {@link Windows.Window} - */ - private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null; - - /** - * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward - */ - private Function<WindowState<V>, Boolean> timerTrigger = null; - - /** - * The state updater function to be applied after the first output is triggered - */ - private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity(); - - /** - * The state updater function to be applied after the late output is triggered - */ - private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity(); - - /** - * Helper method to add a trigger condition - * - * @param currentTrigger current trigger condition - * @param newTrigger new trigger condition - * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger} - */ - private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger, - BiFunction<M, WindowState<V>, Boolean> newTrigger) { - if (currentTrigger == null) { - return newTrigger; - } - - return PredicateHelper.or(currentTrigger, newTrigger); - } - - /** - * Helper method to add a system timer trigger - * - * @param currentTimer current timer condition - * @param newTimer new timer condition - * @return combined timer condition that is {@code currentTimer} OR {@code newTimer} - */ - private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer, - Function<WindowState<V>, Boolean> newTimer) { - if (currentTimer == null) { - return newTimer; - } - - return PredicateHelper.or(currentTimer, newTimer); - } - - /** - * default constructor to prevent instantiation - */ - private TriggerBuilder() {} - - /** - * Constructor that set the size limit as the early trigger for a window - * - * @param sizeLimit the number of messages in a window that would trigger the first output - */ - private TriggerBuilder(long sizeLimit) { - this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit; - } - - /** - * Constructor that set the event time length as the early trigger - * - * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link Message} - * @param wndLenMs the window length in event time in milli-second - */ - private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) { - this.earlyTrigger = (m, s) -> - TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(), - eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs; - } - - /** - * Constructor that set the special token message as the early trigger - * - * @param tokenFunc the function that checks whether an input {@link Message} is a token message that triggers window output - */ - private TriggerBuilder(Function<M, Boolean> tokenFunc) { - this.earlyTrigger = (m, s) -> tokenFunc.apply(m); - } - - /** - * Build method that creates an {@link org.apache.samza.operators.internal.Trigger} object based on the trigger conditions set in {@link TriggerBuilder} - * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object - * - * @return the final {@link org.apache.samza.operators.internal.Trigger} object - */ - Trigger<M, WindowState<V>> build() { - return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater); - } - - /** - * Public API methods start here - */ - - - /** - * API method to allow users to set an update method to update the output value after the first window output is triggered - * by the early trigger condition - * - * @param onTriggerFunc the method to update the output value after the early trigger - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) { - this.earlyTriggerUpdater = s -> { - s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); - return s; - }; - return this; - } - - /** - * API method to allow users to set an update method to update the output value after a late window output is triggered - * by the late trigger condition - * - * @param onTriggerFunc the method to update the output value after the late trigger - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) { - this.lateTriggerUpdater = s -> { - s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); - return s; - }; - return this; - } - - /** - * API method to allow users to add a system timer trigger based on timeout after the last message received in the window - * - * @param timeoutMs the timeout in ms after the last message received in the window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) { - this.timerTrigger = this.addTimerTrigger(this.timerTrigger, - s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); - return this; - } - - /** - * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window - * - * @param timeoutMs the timeout in ms after the first message received in the window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) { - this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s -> - TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); - return this; - } - - /** - * API method allow users to add a late trigger based on the window size limit - * - * @param sizeLimit limit on the number of messages in window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) { - this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit); - return this; - } - - /** - * API method to allow users to define a customized late trigger function based on input message and the window state - * - * @param lateTrigger the late trigger condition based on input {@link Message} and the current {@link WindowState} - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) { - this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger); - return this; - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit - * - * @param sizeLimit window size limit - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) { - return new TriggerBuilder<M, V>(sizeLimit); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window - * - * - * @param eventTimeFunc the function to get the event time from the input message - * @param eventTimeWndSizeMs the event time window size in Ms - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) { - return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages - * - * @param tokenFunc the function to determine whether an input message is a window token or not - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) { - return new TriggerBuilder<M, V>(tokenFunc); - } - - /** - * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState} - * - * @param earlyTrigger the user defined early trigger condition - * @param <M> the input message type - * @param <V> the output value from the window - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) { - TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>(); - newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger); - return newTriggers; - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window - * - * @param timeoutMs timeout in ms after the last message received - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) { - return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window - * - * @param timeoutMs timeout in ms after the first message received - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) { - return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/WindowState.java b/samza-api/src/main/java/org/apache/samza/operators/WindowState.java deleted file mode 100644 index 759f2d8..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/WindowState.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; - - -/** - * This interface defines the methods a window state class has to implement. The programmers are allowed to implement - * customized window state to be stored in window state stores by implementing this interface class. - * - * @param <WV> the type for window output value - */ -@InterfaceStability.Unstable -public interface WindowState<WV> { - /** - * Method to get the system time when the first message in the window is received - * - * @return nano-second of system time for the first message received in the window - */ - long getFirstMessageTimeNs(); - - /** - * Method to get the system time when the last message in the window is received - * - * @return nano-second of system time for the last message received in the window - */ - long getLastMessageTimeNs(); - - /** - * Method to get the earliest event time in the window - * - * @return the earliest event time in nano-second in the window - */ - long getEarliestEventTimeNs(); - - /** - * Method to get the latest event time in the window - * - * @return the latest event time in nano-second in the window - */ - long getLatestEventTimeNs(); - - /** - * Method to get the total number of messages received in the window - * - * @return number of messages in the window - */ - long getNumberMessages(); - - /** - * Method to get the corresponding window's output value - * - * @return the corresponding window's output value - */ - WV getOutputValue(); - - /** - * Method to set the corresponding window's output value - * - * @param value the corresponding window's output value - */ - void setOutputValue(WV value); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/Windows.java deleted file mode 100644 index 6619f41..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/Windows.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators; -import org.apache.samza.operators.internal.Trigger; -import org.apache.samza.operators.internal.WindowFn; -import org.apache.samza.operators.internal.WindowOutput; -import org.apache.samza.storage.kv.Entry; - -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be - * used by the user (i.e. programmers) to create {@link Window} function directly. - * - */ -@InterfaceStability.Unstable -public final class Windows { - - /** - * private constructor to prevent instantiation - */ - private Windows() {} - - /** - * This class defines a session window function class - * - * @param <M> the type of input {@link Message} - * @param <WK> the type of session key in the session window - * @param <WV> the type of output value in each session window - */ - static class SessionWindow<M extends Message, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> { - - /** - * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows} - * - * @param sessionKeyFunction function to get the session key from the input {@link Message} - * @param aggregator function to calculate the output value based on the input {@link Message} and current output value - */ - private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) { - this.wndKeyFunction = sessionKeyFunction; - this.aggregator = aggregator; - } - - /** - * function to calculate the window key from input message - */ - private final Function<M, WK> wndKeyFunction; - - /** - * function to calculate the output value from the input message and the current output value - */ - private final BiFunction<M, WV, WV> aggregator; - - /** - * trigger condition that determines when to send out the output value in a {@link WindowOutput} message - */ - private Trigger<M, WindowState<WV>> trigger = null; - - //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link Message} type for {@link Window} - private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null; - - /** - * Public API methods start here - */ - - /** - * Public API method to define the watermark trigger for the window operator - * - * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow} - * @return The window operator w/ the defined watermark trigger - */ - @Override - public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) { - this.trigger = wndTrigger.build(); - return this; - } - - private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { - // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers; - return null; - } - - private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() { - return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() { - - @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { - return SessionWindow.this.getTransformFunc(); - } - - @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> getStoreFuncs() { - return SessionWindow.this.storeFunctions; - } - - @Override public Trigger<M, WindowState<WV>> getTrigger() { - return SessionWindow.this.trigger; - } - }; - } - } - - static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn( - Window<M, WK, WV, WM> window) { - if (window instanceof SessionWindow) { - SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window; - return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn(); - } - throw new IllegalArgumentException("Input window type not supported."); - } - - /** - * Public static API methods start here - * - */ - - /** - * The public programming interface class for window function - * - * @param <M> the type of input {@link Message} - * @param <WK> the type of key to the {@link Window} - * @param <WV> the type of output value in the {@link WindowOutput} - * @param <WM> the type of message in the window output stream - */ - @InterfaceStability.Unstable - public interface Window<M extends Message, WK, WV, WM extends WindowOutput<WK, WV>> { - - /** - * Set the triggers for this {@link Window} - * - * @param wndTrigger trigger conditions set by the programmers - * @return the {@link Window} function w/ the trigger {@code wndTrigger} - */ - Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger); - } - - /** - * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input messages - * - * @param sessionKeyFunction function to calculate session window key - * @param <M> type of input {@link Message} - * @param <WK> type of the session window key - * @return the {@link Window} function for the session - */ - public static <M extends Message, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> { - c.add(m); - return c; - }); - } - - /** - * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input messages - * - * @param sessionKeyFunction function to calculate session window key - * @param sessionInfoExtractor function to retrieve session info of type {@code SI} from the input message of type {@code M} - * @param <M> type of the input {@link Message} - * @param <WK> type of the session window key - * @param <SI> type of the session information retrieved from each input message of type {@code M} - * @return the {@link Window} function for the session - */ - public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction, - Function<M, SI> sessionInfoExtractor) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> { - c.add(sessionInfoExtractor.apply(m)); - return c; - }); - } - - /** - * Static API method to create a {@link SessionWindow} as a counter of input messages - * - * @param sessionKeyFunction function to calculate session window key - * @param <M> type of the input {@link Message} - * @param <WK> type of the session window key - * @return the {@link Window} function for the session - */ - public static <M extends Message, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java deleted file mode 100644 index 3c9874d..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.data; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * This class implements a {@link Message} that encapsulates an {@link IncomingMessageEnvelope} from the system - * - */ -public class IncomingSystemMessage implements Message<Object, Object>, InputSystemMessage<Offset> { - /** - * Incoming message envelope - */ - private final IncomingMessageEnvelope imsg; - - /** - * The receive time of this incoming message - */ - private final long recvTimeNano; - - /** - * Ctor to create a {@code IncomingSystemMessage} from {@link IncomingMessageEnvelope} - * - * @param imsg The incoming system message - */ - public IncomingSystemMessage(IncomingMessageEnvelope imsg) { - this.imsg = imsg; - this.recvTimeNano = System.nanoTime(); - } - - @Override - public Object getMessage() { - return this.imsg.getMessage(); - } - - @Override - public Object getKey() { - return this.imsg.getKey(); - } - - @Override - public long getTimestamp() { - return this.recvTimeNano; - } - - @Override - public Offset getOffset() { - // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, - // assuming incoming message carries long value as offset (i.e. Kafka case) - return new LongOffset(this.imsg.getOffset()); - } - - @Override - public SystemStreamPartition getSystemStreamPartition() { - return imsg.getSystemStreamPartition(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java new file mode 100644 index 0000000..a65809c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition} + * and its {@link Offset} within the {@link SystemStreamPartition}. + * <p> + * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}. + */ +public class IncomingSystemMessageEnvelope implements MessageEnvelope<Object, Object> { + + private final IncomingMessageEnvelope ime; + + /** + * Creates an {@code IncomingSystemMessageEnvelope} from the {@link IncomingMessageEnvelope}. + * + * @param ime the {@link IncomingMessageEnvelope} from the input system. + */ + public IncomingSystemMessageEnvelope(IncomingMessageEnvelope ime) { + this.ime = ime; + } + + @Override + public Object getKey() { + return this.ime.getKey(); + } + + @Override + public Object getMessage() { + return this.ime.getMessage(); + } + + public Offset getOffset() { + // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, + // assuming incoming message envelope carries long value as offset (i.e. Kafka case) + return new LongOffset(this.ime.getOffset()); + } + + public SystemStreamPartition getSystemStreamPartition() { + return this.ime.getSystemStreamPartition(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java deleted file mode 100644 index 5c23e74..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.data; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * This interface defines additional methods a message from an system input should implement, including the methods to - * get {@link SystemStreamPartition} and the {@link Offset} of the input system message. - */ -@InterfaceStability.Unstable -public interface InputSystemMessage<O extends Offset> { - - /** - * Get the input message's {@link SystemStreamPartition} - * - * @return the {@link SystemStreamPartition} this message is coming from - */ - SystemStreamPartition getSystemStreamPartition(); - - /** - * Get the offset of the message in the input stream. This should be used to uniquely identify a message in an input stream. - * - * @return The offset of the message in the input stream. - */ - O getOffset(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/Message.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Message.java b/samza-api/src/main/java/org/apache/samza/operators/data/Message.java deleted file mode 100644 index 8441682..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/data/Message.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.data; - -import org.apache.samza.annotation.InterfaceStability; - - -/** - * This class defines the generic interface of {@link Message}, which is a entry in the input/output stream. - * - * <p>The {@link Message} models the basic operatible unit in streaming SQL processes in Samza. - * - */ -@InterfaceStability.Unstable -public interface Message<K, M> { - - /** - * Access method to get the corresponding message body in {@link Message} - * - * @return Message object in this {@link Message} - */ - M getMessage(); - - /** - * Method to indicate whether this {@link Message} indicates deletion of a message w/ the message key - * - * @return A boolean value indicates whether the current message is a delete or insert message - */ - default boolean isDelete() { - return false; - }; - - /** - * Access method to the key of the message - * - * @return The key of the message - */ - K getKey(); - - /** - * Get the message creation timestamp of the message. - * - * @return The message's timestamp in nano seconds. - */ - long getTimestamp(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java new file mode 100644 index 0000000..ad64231 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.data; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s. + */ +@InterfaceStability.Unstable +public interface MessageEnvelope<K, M> { + + /** + * Get the key for this {@link MessageEnvelope}. + * + * @return the key for this {@link MessageEnvelope} + */ + K getKey(); + + /** + * Get the message in this {@link MessageEnvelope}. + * + * @return the message in this {@link MessageEnvelope} + */ + M getMessage(); + + /** + * Whether this {@link MessageEnvelope} indicates deletion of a previous message with this key. + * + * @return true if the current {@link MessageEnvelope} indicates deletion of a previous message with this key + */ + default boolean isDelete() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java new file mode 100644 index 0000000..e611cd0 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; + + +/** + * A function that specifies whether a {@link MessageEnvelope} should be retained for further processing or filtered out. + * @param <M> type of the input {@link MessageEnvelope} + */ +@InterfaceStability.Unstable +@FunctionalInterface +public interface FilterFunction<M extends MessageEnvelope> { + + /** + * Returns a boolean indicating whether this {@link MessageEnvelope} should be retained or filtered out. + * @param message the {@link MessageEnvelope} to be checked + * @return true if {@link MessageEnvelope} should be retained + */ + boolean apply(M message); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java new file mode 100644 index 0000000..dbc0bd9 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; + +import java.util.Collection; + + +/** + * A function that transforms a {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s, + * possibly of a different type. + * @param <M> type of the input {@link MessageEnvelope} + * @param <OM> type of the transformed {@link MessageEnvelope}s + */ +@InterfaceStability.Unstable +@FunctionalInterface +public interface FlatMapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> { + + /** + * Transforms the provided {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s. + * @param message the {@link MessageEnvelope} to be transformed + * @return a collection of 0 or more transformed {@link MessageEnvelope}s + */ + Collection<OM> apply(M message); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java new file mode 100644 index 0000000..8cb1fce --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; + + +/** + * A function that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces + * a joined {@link MessageEnvelope}. + * @param <M> type of the input {@link MessageEnvelope} + * @param <JM> type of the {@link MessageEnvelope} to join with + * @param <RM> type of the joined {@link MessageEnvelope} + */ +@InterfaceStability.Unstable +@FunctionalInterface +public interface JoinFunction<M extends MessageEnvelope, JM extends MessageEnvelope, RM extends MessageEnvelope> { + + /** + * Join the provided {@link MessageEnvelope}s and produces the joined {@link MessageEnvelope}. + * @param message the input {@link MessageEnvelope} + * @param otherMessage the {@link MessageEnvelope} to join with + * @return the joined {@link MessageEnvelope} + */ + RM apply(M message, JM otherMessage); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java new file mode 100644 index 0000000..04919a7 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; + + +/** + * A function that transforms a {@link MessageEnvelope} into another {@link MessageEnvelope}, possibly of a different type. + * @param <M> type of the input {@link MessageEnvelope} + * @param <OM> type of the transformed {@link MessageEnvelope} + */ +@InterfaceStability.Unstable +@FunctionalInterface +public interface MapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> { + + /** + * Transforms the provided {@link MessageEnvelope} into another {@link MessageEnvelope} + * @param message the {@link MessageEnvelope} to be transformed + * @return the transformed {@link MessageEnvelope} + */ + OM apply(M message); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java new file mode 100644 index 0000000..505da92 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * A function that allows sending a {@link MessageEnvelope} to an output system. + * @param <M> type of the input {@link MessageEnvelope} + */ +@InterfaceStability.Unstable +@FunctionalInterface +public interface SinkFunction<M extends MessageEnvelope> { + + /** + * Allows sending the provided {@link MessageEnvelope} to an output {@link org.apache.samza.system.SystemStream} using + * the provided {@link MessageCollector}. Also provides access to the {@link TaskCoordinator} to request commits + * or shut the container down. + * + * @param message the {@link MessageEnvelope} to be sent to an output {@link org.apache.samza.system.SystemStream} + * @param messageCollector the {@link MessageCollector} to use to send the {@link MessageEnvelope} + * @param taskCoordinator the {@link TaskCoordinator} to request commits or shutdown + */ + void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator); + +}