http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java b/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java deleted file mode 100644 index e557b34..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.WindowOutput; -import org.apache.samza.operators.api.internal.Trigger; -import org.apache.samza.operators.api.internal.Operators; -import org.apache.samza.operators.api.internal.WindowFn; -import org.apache.samza.storage.kv.Entry; - -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be - * used by the user (i.e. programmers) to create {@link Window} function directly. - * - */ -public final class Windows { - - /** - * private constructor to prevent instantiation - */ - private Windows() {} - - /** - * This class defines a session window function class - * - * @param <M> the type of input {@link Message} - * @param <WK> the type of session key in the session window - * @param <WV> the type of output value in each session window - */ - static class SessionWindow<M extends Message, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> { - - /** - * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows} - * - * @param sessionKeyFunction function to get the session key from the input {@link Message} - * @param aggregator function to calculate the output value based on the input {@link Message} and current output value - */ - private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) { - this.wndKeyFunction = sessionKeyFunction; - this.aggregator = aggregator; - } - - /** - * function to calculate the window key from input message - */ - private final Function<M, WK> wndKeyFunction; - - /** - * function to calculate the output value from the input message and the current output value - */ - private final BiFunction<M, WV, WV> aggregator; - - /** - * trigger condition that determines when to send out the output value in a {@link WindowOutput} message - */ - private Trigger<M, WindowState<WV>> trigger = null; - - //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link Message} type for {@link Window} - private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null; - - /** - * Public API methods start here - */ - - /** - * Public API method to define the watermark trigger for the window operator - * - * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow} - * @return The window operator w/ the defined watermark trigger - */ - @Override - public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) { - this.trigger = wndTrigger.build(); - return this; - } - - private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { - // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers; - return null; - } - - private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() { - return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() { - - @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { - return SessionWindow.this.getTransformFunc(); - } - - @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> getStoreFuncs() { - return SessionWindow.this.storeFunctions; - } - - @Override public Trigger<M, WindowState<WV>> getTrigger() { - return SessionWindow.this.trigger; - } - }; - } - } - - static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn( - Window<M, WK, WV, WM> window) { - if (window instanceof SessionWindow) { - SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window; - return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn(); - } - throw new IllegalArgumentException("Input window type not supported."); - } - - /** - * Public static API methods start here - * - */ - - /** - * The public programming interface class for window function - * - * @param <M> the type of input {@link Message} - * @param <WK> the type of key to the {@link Window} - * @param <WV> the type of output value in the {@link WindowOutput} - * @param <WM> the type of message in the window output stream - */ - public interface Window<M extends Message, WK, WV, WM extends WindowOutput<WK, WV>> { - - /** - * Set the triggers for this {@link Window} - * - * @param wndTrigger trigger conditions set by the programmers - * @return the {@link Window} function w/ the trigger {@code wndTrigger} - */ - Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger); - } - - /** - * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input messages - * - * @param sessionKeyFunction function to calculate session window key - * @param <M> type of input {@link Message} - * @param <WK> type of the session window key - * @return the {@link Window} function for the session - */ - public static <M extends Message, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> { c.add(m); return c; }); - } - - /** - * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input messages - * - * @param sessionKeyFunction function to calculate session window key - * @param sessionInfoExtractor function to retrieve session info of type {@code SI} from the input message of type {@code M} - * @param <M> type of the input {@link Message} - * @param <WK> type of the session window key - * @param <SI> type of the session information retrieved from each input message of type {@code M} - * @return the {@link Window} function for the session - */ - public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction, - Function<M, SI> sessionInfoExtractor) { - return new SessionWindow<>(sessionKeyFunction, - (m, c) -> { c.add(sessionInfoExtractor.apply(m)); return c; } ); - } - - /** - * Static API method to create a {@link SessionWindow} as a counter of input messages - * - * @param sessionKeyFunction function to calculate session window key - * @param <M> type of the input {@link Message} - * @param <WK> type of the session window key - * @return the {@link Window} function for the session - */ - public static <M extends Message, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1); - } - -}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java deleted file mode 100644 index ba74618..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.data; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * This class implements a {@link Message} that encapsulates an {@link IncomingMessageEnvelope} from the system - * - */ -public class IncomingSystemMessage implements Message<Object, Object>, InputSystemMessage<Offset> { - /** - * Incoming message envelope - */ - private final IncomingMessageEnvelope imsg; - - /** - * The receive time of this incoming message - */ - private final long recvTimeNano; - - /** - * Ctor to create a {@code IncomingSystemMessage} from {@link IncomingMessageEnvelope} - * - * @param imsg The incoming system message - */ - public IncomingSystemMessage(IncomingMessageEnvelope imsg) { - this.imsg = imsg; - this.recvTimeNano = System.nanoTime(); - } - - @Override - public Object getMessage() { - return this.imsg.getMessage(); - } - - @Override - public Object getKey() { - return this.imsg.getKey(); - } - - @Override - public long getTimestamp() { - return this.recvTimeNano; - } - - @Override - public Offset getOffset() { - // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, - // assuming incoming message carries long value as offset (i.e. Kafka case) - return new LongOffset(this.imsg.getOffset()); - } - - @Override - public SystemStreamPartition getSystemStreamPartition() { - return imsg.getSystemStreamPartition(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java deleted file mode 100644 index c786025..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.data; - -import org.apache.samza.system.SystemStreamPartition; - - -/** - * This interface defines additional methods a message from an system input should implement, including the methods to - * get {@link SystemStreamPartition} and the {@link Offset} of the input system message. - */ -public interface InputSystemMessage<O extends Offset> { - - /** - * Get the input message's {@link SystemStreamPartition} - * - * @return the {@link SystemStreamPartition} this message is coming from - */ - SystemStreamPartition getSystemStreamPartition(); - - /** - * Get the offset of the message in the input stream. This should be used to uniquely identify a message in an input stream. - * - * @return The offset of the message in the input stream. - */ - O getOffset(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java deleted file mode 100644 index f059b33..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api.data; - -/** - * An implementation of {@link org.apache.samza.operators.api.data.Offset}, w/ {@code long} value as the offset - */ -public class LongOffset implements Offset { - - /** - * The offset value in {@code long} - */ - private final Long offset; - - private LongOffset(long offset) { - this.offset = offset; - } - - public LongOffset(String offset) { - this.offset = Long.valueOf(offset); - } - - @Override - public int compareTo(Offset o) { - if (!(o instanceof LongOffset)) { - throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName()); - } - LongOffset other = (LongOffset) o; - return this.offset.compareTo(other.offset); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof LongOffset)) { - return false; - } - LongOffset o = (LongOffset) other; - return this.offset.equals(o.offset); - } - - /** - * Helper method to get the minimum offset - * - * @return The minimum offset - */ - public static LongOffset getMinOffset() { - return new LongOffset(Long.MIN_VALUE); - } - - /** - * Helper method to get the maximum offset - * - * @return The maximum offset - */ - public static LongOffset getMaxOffset() { - return new LongOffset(Long.MAX_VALUE); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java deleted file mode 100644 index 9b53b45..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api.data; - -/** - * This class defines the generic interface of {@link Message}, which is a entry in the input/output stream. - * - * <p>The {@link Message} models the basic operatible unit in streaming SQL processes in Samza. - * - */ -public interface Message<K, M> { - - /** - * Access method to get the corresponding message body in {@link Message} - * - * @return Message object in this {@link Message} - */ - M getMessage(); - - /** - * Method to indicate whether this {@link Message} indicates deletion of a message w/ the message key - * - * @return A boolean value indicates whether the current message is a delete or insert message - */ - default boolean isDelete() { return false; }; - - /** - * Access method to the key of the message - * - * @return The key of the message - */ - K getKey(); - - /** - * Get the message creation timestamp of the message. - * - * @return The message's timestamp in nano seconds. - */ - long getTimestamp(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java deleted file mode 100644 index 0fac2c0..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api.data; - -/** - * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream - */ -public interface Offset extends Comparable<Offset> { - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java deleted file mode 100644 index e9bfe0b..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java +++ /dev/null @@ -1,468 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api.internal; - -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines all basic stream operator classes used by internal implementation only. All classes defined in - * this file are immutable. - * - * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects - * should be initiated via {@link MessageStream} API methods - */ -public class Operators { - /** - * Private constructor to prevent instantiation of the {@link Operators} class - */ - private Operators() {} - - private static String getOperatorId() { - // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts - return UUID.randomUUID().toString(); - } - - /** - * Private interface for stream operator functions. The interface class defines the output of the stream operator function. - * - */ - public interface Operator<OM extends Message> { - MessageStream<OM> getOutputStream(); - } - - /** - * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s. - * - * @param <M> the type of input {@link Message} - * @param <OM> the type of output {@link Message} - */ - public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> { - /** - * The output {@link MessageStream} - */ - private final MessageStream<OM> outputStream; - - /** - * The transformation function - */ - private final Function<M, Collection<OM>> txfmFunction; - - /** - * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}. - * - * @param transformFn the transformation function to be applied that transforms 1 input {@link Message} into a collection - * of output {@link Message}s - */ - private StreamOperator(Function<M, Collection<OM>> transformFn) { - this(transformFn, new MessageStream<>()); - } - - /** - * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream} - * - * @param transformFn the transformation function - * @param outputStream the output {@link MessageStream} - */ - private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) { - this.outputStream = outputStream; - this.txfmFunction = transformFn; - } - - @Override - public MessageStream<OM> getOutputStream() { - return this.outputStream; - } - - /** - * Method to get the transformation function. - * - * @return the {@code txfmFunction} - */ - public Function<M, Collection<OM>> getFunction() { - return this.txfmFunction; - } - - } - - /** - * A sink operator function that allows customized code to send the output to external system. This is the terminal - * operator that does not have any output {@link MessageStream} that allows further processing in the same {@link org.apache.samza.task.StreamOperatorTask} - * - * @param <M> the type of input {@link Message} - */ - public static class SinkOperator<M extends Message> implements Operator { - - /** - * The user-defined sink function - */ - private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink; - - /** - * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}. - * - * @param sink the user-defined sink function - */ - private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { - this.sink = sink; - } - - @Override - public MessageStream getOutputStream() { - return null; - } - - /** - * Method to get the user-defined function implements the {@link SinkOperator} - * - * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector} - * and {@link TaskCoordinator} to the sink function - */ - public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() { - return this.sink; - } - } - - /** - * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve - * buffered messages and partial aggregation results - * - * @param <SK> the type of key used to store the operator states - * @param <SS> the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered - * input message from the join stream for a join - */ - public static class StoreFunctions<M extends Message, SK, SS> { - /** - * Function to define the key to query in the operator state store, according to the incoming {@link Message} - * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping - * windows and unique-key-based join. - * - * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query - * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input - * message to a range of keys in the store. - */ - private final Function<M, SK> storeKeyFinder; - - /** - * Function to update the store entry based on the current state and the incoming {@link Message} - * - * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping - * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the - * state value. - */ - private final BiFunction<M, SS, SS> stateUpdater; - - /** - * Constructor of state store functions. - * - */ - private StoreFunctions(Function<M, SK> keyFinder, - BiFunction<M, SS, SS> stateUpdater) { - this.storeKeyFinder = keyFinder; - this.stateUpdater = stateUpdater; - } - - /** - * Method to get the {@code storeKeyFinder} function - * - * @return the function to calculate the key from an input {@link Message} - */ - public Function<M, SK> getStoreKeyFinder() { - return this.storeKeyFinder; - } - - /** - * Method to get the {@code stateUpdater} function - * - * @return the function to update the corresponding state according to an input {@link Message} - */ - public BiFunction<M, SS, SS> getStateUpdater() { - return this.stateUpdater; - } - } - - /** - * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate - * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput} - * - * @param <M> the type of input {@link Message} - * @param <WK> the type of key in the output {@link Message} from the {@link WindowOperator} function - * @param <WS> the type of window state in the {@link WindowOperator} function - * @param <WM> the type of window output {@link Message} - */ - public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> { - /** - * The output {@link MessageStream} - */ - private final MessageStream<WM> outputStream; - - /** - * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window - * state(s) from the window state store, and generate output {@link Message}s to the output stream. - */ - private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction; - - /** - * The state store functions for the {@link WindowOperator} - */ - private final StoreFunctions<M, WK, WS> storeFunctions; - - /** - * The window trigger function - */ - private final Trigger<M, WS> trigger; - - /** - * The unique ID of stateful operators - */ - private final String opId; - - /** - * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}. - * - * @param windowFn description of the window function - * @param operatorId auto-generated unique ID of the operator - */ - private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) { - this.outputStream = new MessageStream<>(); - this.txfmFunction = windowFn.getTransformFunc(); - this.storeFunctions = windowFn.getStoreFuncs(); - this.trigger = windowFn.getTrigger(); - this.opId = operatorId; - } - - @Override - public String toString() { - return this.opId; - } - - @Override - public MessageStream<WM> getOutputStream() { - return this.outputStream; - } - - /** - * Method to get the window's {@link StoreFunctions}. - * - * @return the window operator's {@code storeFunctions} - */ - public StoreFunctions<M, WK, WS> getStoreFunctions() { - return this.storeFunctions; - } - - /** - * Method to get the window operator's main function - * - * @return the window operator's {@code txfmFunction} - */ - public BiFunction<M, Entry<WK, WS>, WM> getFunction() { - return this.txfmFunction; - } - - /** - * Method to get the trigger functions - * - * @return the {@link Trigger} for this {@link WindowOperator} - */ - public Trigger<M, WS> getTrigger() { - return this.trigger; - } - - /** - * Method to generate the window operator's state store name - * - * @param inputStream the input {@link MessageStream} to this state store - * @return the persistent store name of the window operator - */ - public String getStoreName(MessageStream<M> inputStream) { - //TODO: need to get the persistent name of ds and the operator in a serialized form - return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString()); - } - } - - /** - * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from - * another stream and generate join output to {@code output} - * - * @param <M> the type of input {@link Message} - * @param <K> the type of join key - * @param <JM> the type of message of {@link Message} in the other join stream - * @param <RM> the type of message of {@link Message} in the join output stream - */ - public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> { - - private final MessageStream<RM> joinOutput; - - /** - * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message, - * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}. - */ - private final BiFunction<M, JM, RM> txfmFunction; - - /** - * The message store functions that read the buffered messages from the other stream in the join - */ - private final StoreFunctions<JM, K, JM> joinStoreFunctions; - - /** - * The message store functions that save the buffered messages of this {@link MessageStream} in the join - */ - private final StoreFunctions<M, K, M> selfStoreFunctions; - - /** - * The unique ID for the stateful operator - */ - private final String opId; - - /** - * Default constructor to create a {@link PartialJoinOperator} object - * - * @param partialJoin partial join function that take type {@code M} of input {@link Message} and join w/ type - * {@code JM} of buffered {@link Message} from another stream - * @param joinOutput the output {@link MessageStream} of the join results - */ - private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) { - this.joinOutput = joinOutput; - this.txfmFunction = partialJoin; - // Read-only join store, no creator/updater functions specified - this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null); - // Buffered message store for this input stream - this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m); - this.opId = opId; - } - - @Override - public String toString() { - return this.opId; - } - - @Override - public MessageStream<RM> getOutputStream() { - return this.joinOutput; - } - - /** - * Method to get {@code joinStoreFunctions} - * - * @return {@code joinStoreFunctions} - */ - public StoreFunctions<JM, K, JM> getJoinStoreFunctions() { - return this.joinStoreFunctions; - } - - /** - * Method to get {@code selfStoreFunctions} - * - * @return {@code selfStoreFunctions} - */ - public StoreFunctions<M, K, M> getSelfStoreFunctions() { - return this.selfStoreFunctions; - } - - /** - * Method to get {@code txfmFunction} - * - * @return {@code txfmFunction} - */ - public BiFunction<M, JM, RM> getFunction() { - return this.txfmFunction; - } - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} - * - * @param transformFn the corresponding transformation function - * @param <M> type of input {@link Message} - * @param <OM> type of output {@link Message} - * @return the {@link StreamOperator} - */ - public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) { - return new StreamOperator<>(transformFn); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator} - * - * @param sinkFn the sink function - * @param <M> type of input {@link Message} - * @return the {@link SinkOperator} - */ - public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) { - return new SinkOperator<>(sinkFn); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator} - * - * @param windowFn the {@link WindowFn} function - * @param <M> type of input {@link Message} - * @param <WK> type of window key - * @param <WS> type of {@link WindowState} - * @param <WM> type of output {@link WindowOutput} - * @return the {@link WindowOperator} - */ - public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator( - WindowFn<M, WK, WS, WM> windowFn) { - return new WindowOperator<>(windowFn, Operators.getOperatorId()); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator} - * - * @param joiner the {@link WindowFn} function - * @param joinOutput the output {@link MessageStream} - * @param <M> type of input {@link Message} - * @param <K> type of join key - * @param <JM> the type of message in the {@link Message} from the other join stream - * @param <RM> the type of message in the {@link Message} from the join function - * @return the {@link PartialJoinOperator} - */ - public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator( - BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) { - return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId()); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function - * - * @param mergeOutput the common output {@link MessageStream} from the merger - * @param <M> the type of input {@link Message} - * @return the {@link StreamOperator} for merge - */ - public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) { - return new StreamOperator<M, M>(t -> - new ArrayList<M>() {{ - this.add(t); - }}, - mergeOutput); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java deleted file mode 100644 index 33a0134..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.internal; - -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; - -import java.util.function.BiFunction; -import java.util.function.Function; - -/** - * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable. - * - * @param <M> the type of message from the input stream - * @param <S> the type of state variable in the window's state store - */ -public class Trigger<M extends Message, S extends WindowState> { - - /** - * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward - */ - private final Function<S, Boolean> timerTrigger; - - /** - * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator} - */ - private final BiFunction<M, S, Boolean> earlyTrigger; - - /** - * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator} - */ - private final BiFunction<M, S, Boolean> lateTrigger; - - /** - * the function to updated the window state when the first output is triggered - */ - private final Function<S, S> earlyTriggerUpdater; - - /** - * the function to updated the window state when the late output is triggered - */ - private final Function<S, S> lateTriggerUpdater; - - /** - * Private constructor to prevent instantiation - * - * @param timerTrigger system timer trigger condition - * @param earlyTrigger early trigger condition - * @param lateTrigger late trigger condition - * @param earlyTriggerUpdater early trigger state updater - * @param lateTriggerUpdater late trigger state updater - */ - private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, - Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) { - this.timerTrigger = timerTrigger; - this.earlyTrigger = earlyTrigger; - this.lateTrigger = lateTrigger; - this.earlyTriggerUpdater = earlyTriggerUpdater; - this.lateTriggerUpdater = lateTriggerUpdater; - } - - /** - * Static method to create a {@link Trigger} object - * - * @param timerTrigger system timer trigger condition - * @param earlyTrigger early trigger condition - * @param lateTrigger late trigger condition - * @param earlyTriggerUpdater early trigger state updater - * @param lateTriggerUpdater late trigger state updater - * @param <M> the type of input {@link Message} - * @param <S> the type of window state extends {@link WindowState} - * @return the {@link Trigger} function - */ - public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger, - BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater, - Function<S, S> lateTriggerUpdater) { - return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java deleted file mode 100644 index 1fd88e7..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.internal; - -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.storage.kv.Entry; - -import java.util.function.BiFunction; - - -/** - * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used - * by the internal representation and implementation classes in operators. - * - * @param <M> type of input stream {@link Message} for window - * @param <WK> type of window key in the output {@link Message} - * @param <WS> type of {@link WindowState} variable in the state store - * @param <WM> type of the message in the output stream - */ -public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> { - - /** - * get the transformation function of the {@link WindowFn} - * - * @return the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput} - */ - BiFunction<M, Entry<WK, WS>, WM> getTransformFunc(); - - /** - * get the state store functions for this {@link WindowFn} - * - * @return the collection of state store methods - */ - Operators.StoreFunctions<M, WK, WS> getStoreFuncs(); - - /** - * get the trigger conditions for this {@link WindowFn} - * - * @return the trigger condition for the {@link WindowFn} function - */ - Trigger<M, WS> getTrigger(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java deleted file mode 100644 index e202c20..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.internal; - -import org.apache.samza.operators.api.data.Message; - - -/** - * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function - * - * @param <K> the type of key in the output window result - * @param <M> the type of value in the output window result - */ -public final class WindowOutput<K, M> implements Message<K, M> { - private final K key; - private final M value; - - WindowOutput(K key, M aggregated) { - this.key = key; - this.value = aggregated; - } - - @Override public M getMessage() { - return this.value; - } - - @Override public K getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return 0; - } - - static public <K, M> WindowOutput<K, M> of(K key, M result) { - return new WindowOutput<>(key, result); - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java index 59de16b..82f3c28 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java @@ -19,9 +19,9 @@ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.Operator; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.Operator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java index f16cbc6..d3d8f8b 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java @@ -19,10 +19,10 @@ package org.apache.samza.operators.impl; import org.apache.commons.collections.keyvalue.AbstractMapEntry; -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.*; -import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.*; +import org.apache.samza.operators.internal.WindowOutput; import org.apache.samza.operators.impl.join.PartialJoinOpImpl; import org.apache.samza.operators.impl.window.SessionWindowImpl; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 3ca8bde..f55c758 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -18,8 +18,8 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.MessageStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -90,7 +90,7 @@ public abstract class OperatorImpl<M extends Message, RM extends Message> protected void init(MessageStream<M> source, TaskContext context) {}; /** - * Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream} + * Method to trigger all downstream operators that consumes the output {@link MessageStream} * from this operator * * @param omsg output {@link Message} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java index 5a375bc..cc7ef2b 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java @@ -18,7 +18,7 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.data.Message; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java index b29d9c8..b0f4f27 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java @@ -18,8 +18,8 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.StreamOperator; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.StreamOperator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index 5d25cfa..a8a639e 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -18,9 +18,9 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.internal.Operators.SinkOperator; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.internal.Operators.SinkOperator; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.Message; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java index f573fd0..7840b5b 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java @@ -18,8 +18,8 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.StoreFunctions; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.StoreFunctions; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.TaskContext; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java index bbe08a4..f4a6a58 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java @@ -18,19 +18,20 @@ */ package org.apache.samza.operators.impl.join; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators; +import org.apache.samza.operators.internal.Operators.PartialJoinOperator; import org.apache.samza.operators.impl.OperatorImpl; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; /** - * Implementation of a {@link org.apache.samza.operators.api.internal.Operators.PartialJoinOperator}. This class implements function + * Implementation of a {@link Operators.PartialJoinOperator}. This class implements function * that only takes in one input stream among all inputs to the join and generate the join output. * - * @param <M> Type of input stream {@link org.apache.samza.operators.api.data.Message} - * @param <RM> Type of join output stream {@link org.apache.samza.operators.api.data.Message} + * @param <M> Type of input stream {@link Message} + * @param <RM> Type of join output stream {@link Message} */ public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> { http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java index 59e2dec..0d6141e 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java @@ -18,11 +18,11 @@ */ package org.apache.samza.operators.impl.window; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.WindowOperator; -import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.WindowOperator; +import org.apache.samza.operators.internal.WindowOutput; import org.apache.samza.operators.impl.OperatorImpl; import org.apache.samza.operators.impl.StateStoreImpl; import org.apache.samza.storage.kv.Entry; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java index e340fe8..18b077b 100644 --- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java +++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java @@ -19,11 +19,12 @@ package org.apache.samza.task; import org.apache.samza.config.Config; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.MessageStreams; -import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.api.data.IncomingSystemMessage; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreams; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.data.IncomingSystemMessage; import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.operators.task.StreamOperatorTask; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; @@ -45,7 +46,7 @@ public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask /** * Wrapped {@link StreamOperatorTask} class */ - private final StreamOperatorTask userTask; + private final StreamOperatorTask userTask; /** * Constructor that wraps the user-defined {@link StreamOperatorTask} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java deleted file mode 100644 index cfdb694..0000000 --- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.task; - -import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; -import java.util.Collection; - -/** - * This interface defines the methods that user needs to implement via the operator programming APIs. - */ -public interface StreamOperatorTask { - - /** - * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s. - * Users have to implement this function to instantiate {@link org.apache.samza.operators.impl.ChainedOperators} that - * will process each incoming {@link SystemMessageStream}. - * - * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition} - * - * @param sources the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.api.data.IncomingSystemMessage} - * from a {@link org.apache.samza.system.SystemStreamPartition} - */ - void initOperators(Collection<SystemMessageStream> sources); - - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java deleted file mode 100644 index 0f00fdb..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.data.Message; - - -public class TestMessage implements Message<String, String> { - - private final String key; - private final String value; - private final long timestamp; - - TestMessage(String key, String value, long timestamp) { - this.key = key; - this.value = value; - this.timestamp = timestamp; - } - - @Override public String getMessage() { - return this.value; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java deleted file mode 100644 index 9f9ad6b..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.internal.Operators.*; -import org.apache.samza.operators.api.internal.WindowOutput; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestMessageStream { - - @Test public void testMap() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2); - MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestOutputMessage> mapOp = subs.iterator().next(); - assertTrue(mapOp instanceof StreamOperator); - assertEquals(mapOp.getOutputStream(), outputStream); - // assert that the transformation function is what we defined above - TestMessage xTestMsg = mock(TestMessage.class); - when(xTestMsg.getKey()).thenReturn("test-msg-key"); - when(xTestMsg.getMessage()).thenReturn("123456789"); - when(xTestMsg.getTimestamp()).thenReturn(12345L); - Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg); - assertEquals(cOutputMsg.size(), 1); - TestOutputMessage outputMessage = cOutputMsg.iterator().next(); - assertEquals(outputMessage.getKey(), xTestMsg.getKey()); - assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1)); - assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2); - } - - @Test public void testFlatMap() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() {{ - this.add(mock(TestOutputMessage.class)); - this.add(mock(TestOutputMessage.class)); - this.add(mock(TestOutputMessage.class)); - }}; - Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts; - MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestOutputMessage> flatMapOp = subs.iterator().next(); - assertTrue(flatMapOp instanceof StreamOperator); - assertEquals(flatMapOp.getOutputStream(), outputStream); - // assert that the transformation function is what we defined above - assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap); - } - - @Test public void testFilter() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L; - MessageStream<TestMessage> outputStream = inputStream.filter(xFilter); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> filterOp = subs.iterator().next(); - assertTrue(filterOp instanceof StreamOperator); - assertEquals(filterOp.getOutputStream(), outputStream); - // assert that the transformation function is what we defined above - Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction(); - TestMessage mockMsg = mock(TestMessage.class); - when(mockMsg.getTimestamp()).thenReturn(11111L); - Collection<TestMessage> output = txfmFn.apply(mockMsg); - assertTrue(output.isEmpty()); - when(mockMsg.getTimestamp()).thenReturn(999999L); - output = txfmFn.apply(mockMsg); - assertEquals(output.size(), 1); - assertEquals(output.iterator().next(), mockMsg); - } - - @Test public void testSink() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> { - mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); - tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); - }; - inputStream.sink(xSink); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> sinkOp = subs.iterator().next(); - assertTrue(sinkOp instanceof SinkOperator); - assertEquals(((SinkOperator) sinkOp).getFunction(), xSink); - assertNull(((SinkOperator) sinkOp).getOutputStream()); - } - - @Test public void testWindow() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class); - MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> wndOp = subs.iterator().next(); - assertTrue(wndOp instanceof WindowOperator); - assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream); - } - - @Test public void testJoin() { - MessageStream<TestMessage> source1 = new MessageStream<>(); - MessageStream<TestMessage> source2 = new MessageStream<>(); - BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp()); - MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner); - Collection<Operator> subs = source1.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> joinOp1 = subs.iterator().next(); - assertTrue(joinOp1 instanceof PartialJoinOperator); - assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput); - subs = source2.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> joinOp2 = subs.iterator().next(); - assertTrue(joinOp2 instanceof PartialJoinOperator); - assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput); - TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L); - TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L); - TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2); - assertEquals(xOut.getKey(), "test-join-1"); - assertEquals(xOut.getMessage(), Integer.valueOf(24)); - assertEquals(xOut.getTimestamp(), 11111L); - xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1); - assertEquals(xOut.getKey(), "test-join-1"); - assertEquals(xOut.getMessage(), Integer.valueOf(24)); - assertEquals(xOut.getTimestamp(), 11111L); - } - - @Test public void testMerge() { - MessageStream<TestMessage> merge1 = new MessageStream<>(); - Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>(){{ - this.add(new MessageStream<>()); - this.add(new MessageStream<>()); - }}; - MessageStream<TestMessage> mergeOutput = merge1.merge(others); - validateMergeOperator(merge1, mergeOutput); - - others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); - } - - private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) { - Collection<Operator> subs = mergeSource.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> mergeOp = subs.iterator().next(); - assertTrue(mergeOp instanceof StreamOperator); - assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput); - TestMessage mockMsg = mock(TestMessage.class); - Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg); - assertEquals(outputs.size(), 1); - assertEquals(outputs.iterator().next(), mockMsg); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java deleted file mode 100644 index e6aa692..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.Partition; -import org.apache.samza.system.SystemStreamPartition; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - - -public class TestMessageStreams { - - @Test public void testInput() { - SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0)); - MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp); - assertEquals(mSysStream.getSystemStreamPartition(), ssp); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java deleted file mode 100644 index 225e77f..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.data.Message; - - -public class TestOutputMessage implements Message<String, Integer> { - private final String key; - private final Integer value; - private final long timestamp; - - public TestOutputMessage(String key, Integer value, long timestamp) { - this.key = key; - this.value = value; - this.timestamp = timestamp; - } - - @Override public Integer getMessage() { - return this.value; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } -} -
