http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index efa6a96..c77914e 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -18,88 +18,60 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.Message; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; -import org.reactivestreams.Processor; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import java.util.HashSet; import java.util.Set; /** - * Abstract base class for all stream operator implementation classes. + * Abstract base class for all stream operator implementations. */ -public abstract class OperatorImpl<M extends Message, RM extends Message> - implements Processor<ProcessorContext<M>, ProcessorContext<RM>> { +public abstract class OperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> { - private final Set<Subscriber<? super ProcessorContext<RM>>> subscribers = new HashSet<>(); - - @Override public void subscribe(Subscriber<? super ProcessorContext<RM>> s) { - // Only add once - subscribers.add(s); - } - - @Override public void onSubscribe(Subscription s) { - - } - - @Override public void onNext(ProcessorContext<M> o) { - - onNext(o.getMessage(), o.getCollector(), o.getCoordinator()); - } - - @Override public void onError(Throwable t) { - - } - - @Override public void onComplete() { + private final Set<OperatorImpl<RM, ? extends MessageEnvelope>> nextOperators = new HashSet<>(); + /** + * Register the next operator in the chain that this operator should propagate its output to. + * @param nextOperator the next operator in the chain. + */ + void registerNextOperator(OperatorImpl<RM, ? extends MessageEnvelope> nextOperator) { + nextOperators.add(nextOperator); } /** - * Default method for timer event + * Initialize the initial state for stateful operators. * - * @param nanoTime the system nano-second when the timer event is triggered - * @param collector the {@link MessageCollector} in the context - * @param coordinator the {@link TaskCoordinator} in the context + * @param source the source that this {@link OperatorImpl} operator is registered with + * @param context the task context to initialize the operator implementation */ - public void onTimer(long nanoTime, MessageCollector collector, TaskCoordinator coordinator) { - this.subscribers.forEach(sub -> ((OperatorImpl) sub).onTimer(nanoTime, collector, coordinator)); - } + public void init(MessageStream<M> source, TaskContext context) {} /** - * Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers. + * Perform the transformation required for this operator and call the downstream operators. + * + * Must call {@link #propagateResult} to propage the output to registered downstream operators correctly. * - * @param message the input {@link Message} + * @param message the input {@link MessageEnvelope} * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - protected abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); + public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); /** - * Stateful operators will need to override this method to initialize the operators + * Helper method to propagate the output of this operator to all registered downstream operators. * - * @param source the source that this {@link OperatorImpl} object subscribe to - * @param context the task context to initialize the operators within - */ - protected void init(MessageStream<M> source, TaskContext context) {}; - - /** - * Method to trigger all downstream operators that consumes the output {@link MessageStream} - * from this operator + * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly. * - * @param omsg output {@link Message} + * @param outputMessage output {@link MessageEnvelope} * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - protected void nextProcessors(RM omsg, MessageCollector collector, TaskCoordinator coordinator) { - subscribers.forEach(sub -> - sub.onNext(new ProcessorContext<>(omsg, collector, coordinator)) - ); + void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { + nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); } }
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java new file mode 100644 index 0000000..79446be --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.windows.WindowOutput; +import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.task.TaskContext; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a + * {@link MessageStreamImpl} + */ +public class OperatorImpls { + + /** + * Holds the mapping between the {@link OperatorSpec} and {@link OperatorImpl}s instances. + */ + private static final Map<OperatorSpec, OperatorImpl> OPERATOR_IMPLS = new ConcurrentHashMap<>(); + + /** + * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl}, + * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. + * + * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for + * @param <M> the type of {@link MessageEnvelope}s in the {@code source} {@link MessageStream} + * @param context the {@link TaskContext} required to instantiate operators + * @return root node for the {@link OperatorImpl} DAG + */ + public static <M extends MessageEnvelope> RootOperatorImpl createOperatorImpls(MessageStreamImpl<M> source, TaskContext context) { + // since the source message stream might have multiple operator specs registered on it, + // create a new root node as a single point of entry for the DAG. + RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>(); + // create the pipeline/topology starting from the source + source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { + // pass in the source and context s.t. stateful stream operators can initialize their stores + OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = + createAndRegisterOperatorImpl(registeredOperator, source, context); + rootOperator.registerNextOperator(operatorImpl); + }); + return rootOperator; + } + + /** + * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding + * {@link OperatorImpl}s. + * + * @param operatorSpec the operatorSpec registered with the {@code source} + * @param source the source {@link MessageStreamImpl} + * @param context the context of the task + * @return the operator implementation for the operatorSpec + */ + private static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, + MessageStream source, TaskContext context) { + if (!OPERATOR_IMPLS.containsKey(operatorSpec)) { + OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = createOperatorImpl(operatorSpec); + if (OPERATOR_IMPLS.putIfAbsent(operatorSpec, operatorImpl) == null) { + // this is the first time we've added the operatorImpl corresponding to the operatorSpec, + // so traverse and initialize and register the rest of the DAG. + MessageStream<? extends MessageEnvelope> outStream = operatorSpec.getOutputStream(); + Collection<OperatorSpec> registeredSpecs = ((MessageStreamImpl) outStream).getRegisteredOperatorSpecs(); + registeredSpecs.forEach(registeredSpec -> { + OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, outStream, context); + operatorImpl.registerNextOperator(subImpl); + }); + operatorImpl.init(source, context); + return operatorImpl; + } + } + + // the implementation corresponding to operatorSpec has already been instantiated + // and registered, so we do not need to traverse the DAG further. + return OPERATOR_IMPLS.get(operatorSpec); + } + + /** + * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. + * + * @param operatorSpec the immutable {@link OperatorSpec} definition. + * @param <M> type of input {@link MessageEnvelope} + * @return the {@link OperatorImpl} implementation instance + */ + protected static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createOperatorImpl(OperatorSpec operatorSpec) { + if (operatorSpec instanceof StreamOperatorSpec) { + return new StreamOperatorImpl<>((StreamOperatorSpec<M, ? extends MessageEnvelope>) operatorSpec); + } else if (operatorSpec instanceof SinkOperatorSpec) { + return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec); + } else if (operatorSpec instanceof WindowOperatorSpec) { + return new SessionWindowOperatorImpl<>((WindowOperatorSpec<M, ?, ? extends WindowState, ? extends WindowOutput>) operatorSpec); + } else if (operatorSpec instanceof PartialJoinOperatorSpec) { + return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec); + } + throw new IllegalArgumentException( + String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java new file mode 100644 index 0000000..90569b4 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation of a {@link PartialJoinOperatorSpec}. This class implements function + * that only takes in one input stream among all inputs to the join and generate the join output. + * + * @param <M> type of {@link MessageEnvelope}s in the input stream + * @param <JM> type of {@link MessageEnvelope}s in the stream to join with + * @param <RM> type of {@link MessageEnvelope}s in the joined stream + */ +class PartialJoinOperatorImpl<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> + extends OperatorImpl<M, RM> { + + PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp) { + // TODO: implement PartialJoinOperatorImpl constructor + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // TODO: implement PartialJoinOperatorImpl processing logic + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java deleted file mode 100644 index cc7ef2b..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.operators.data.Message; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Wrapper class to be used by {@link OperatorImpl} - * - * @param <M> Type of input stream {@link Message} - */ -public class ProcessorContext<M extends Message> { - private final M message; - private final MessageCollector collector; - private final TaskCoordinator coordinator; - - ProcessorContext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.message = message; - this.collector = collector; - this.coordinator = coordinator; - } - - M getMessage() { - return this.message; - } - - MessageCollector getCollector() { - return this.collector; - } - - TaskCoordinator getCoordinator() { - return this.coordinator; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java new file mode 100644 index 0000000..7132b86 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * A no-op operator implementation that forwards incoming {@link MessageEnvelope}s to all of its subscribers. + * @param <M> type of incoming {@link MessageEnvelope}s + */ +final class RootOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, M> { + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.propagateResult(message, collector, coordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java new file mode 100644 index 0000000..e8a635c --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StateStoreImpl; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.operators.windows.WindowOutput; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Default implementation class of a {@link WindowOperatorSpec} for a session window. + * + * @param <M> the type of input {@link MessageEnvelope} + * @param <RK> the type of window key + * @param <WS> the type of window state + * @param <RM> the type of aggregated value of the window + */ +class SessionWindowOperatorImpl<M extends MessageEnvelope, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> + extends OperatorImpl<M, RM> { + + private final WindowOperatorSpec<M, RK, WS, RM> windowSpec; + private StateStoreImpl<M, RK, WS> stateStore = null; + + SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WS, RM> windowSpec) { + this.windowSpec = windowSpec; + } + + @Override + public void init(MessageStream<M> source, TaskContext context) { + this.stateStore = new StateStoreImpl<>(this.windowSpec.getStoreFns(), windowSpec.getStoreName(source)); + this.stateStore.init(context); + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + Entry<RK, WS> state = this.stateStore.getState(message); + this.propagateResult(this.windowSpec.getTransformFn().apply(message, state), collector, coordinator); + this.stateStore.updateState(message, state); + } + + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + // This is to periodically check the timeout triggers to get the list of window states to be updated + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java deleted file mode 100644 index b0f4f27..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators.StreamOperator; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - -import java.util.Collection; -import java.util.function.Function; - - -/** - * Base class for all implementation of operators - * - * @param <M> type of message in the input stream - * @param <RM> type of message in the output stream - */ -public class SimpleOperatorImpl<M extends Message, RM extends Message> extends OperatorImpl<M, RM> { - - private final Function<M, Collection<RM>> transformFn; - - SimpleOperatorImpl(StreamOperator<M, RM> op) { - super(); - this.transformFn = op.getFunction(); - } - - @Override protected void onNext(M imsg, MessageCollector collector, TaskCoordinator coordinator) { - // actually calling the transform function and then for each output, call nextProcessors() - this.transformFn.apply(imsg).forEach(r -> this.nextProcessors(r, collector, coordinator)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index a8a639e..abed03f 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -18,24 +18,26 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.internal.Operators.SinkOperator; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; /** - * Implementation for {@link SinkOperator} + * Implementation for {@link SinkOperatorSpec} */ -public class SinkOperatorImpl<M extends Message> extends OperatorImpl<M, Message> { - private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFunc; +class SinkOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, MessageEnvelope> { - SinkOperatorImpl(SinkOperator<M> sinkOp) { - this.sinkFunc = sinkOp.getFunction(); + private final SinkFunction<M> sinkFn; + + SinkOperatorImpl(SinkOperatorSpec<M> sinkOp) { + this.sinkFn = sinkOp.getSinkFn(); } - @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.sinkFunc.apply(message, collector, coordinator); + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.sinkFn.apply(message, collector, coordinator); } } http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java deleted file mode 100644 index 7840b5b..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators.StoreFunctions; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.task.TaskContext; - - -/** - * The base class for all state stores - */ -public class StateStoreImpl<M extends Message, SK, SS> { - private final String storeName; - private final StoreFunctions<M, SK, SS> storeFunctions; - private KeyValueStore<SK, SS> kvStore = null; - - public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) { - this.storeFunctions = store; - this.storeName = storeName; - } - - public void init(TaskContext context) { - this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName); - } - - public Entry<SK, SS> getState(M m) { - SK key = this.storeFunctions.getStoreKeyFinder().apply(m); - SS state = this.kvStore.get(key); - return new Entry<>(key, state); - } - - public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) { - SS newValue = this.storeFunctions.getStateUpdater().apply(m, oldEntry.getValue()); - this.kvStore.put(oldEntry.getKey(), newValue); - return new Entry<>(oldEntry.getKey(), newValue); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java new file mode 100644 index 0000000..3a5c56e --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * A StreamOperator that accepts a 1:n transform function and applies it to each incoming {@link MessageEnvelope}. + * + * @param <M> type of {@link MessageEnvelope} in the input stream + * @param <RM> type of {@link MessageEnvelope} in the output stream + */ +class StreamOperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> extends OperatorImpl<M, RM> { + + private final FlatMapFunction<M, RM> transformFn; + + StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec) { + this.transformFn = streamOperatorSpec.getTransformFn(); + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // call the transform function and then for each output call propagateResult() + this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java deleted file mode 100644 index 4238d45..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl.join; - -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators.PartialJoinOperator; -import org.apache.samza.operators.impl.OperatorImpl; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Implementation of a {@link PartialJoinOperator}. This class implements function - * that only takes in one input stream among all inputs to the join and generate the join output. - * - * @param <M> Type of input stream {@link Message} - * @param <RM> Type of join output stream {@link Message} - */ -public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> { - - public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) { - // TODO: implement PartialJoinOpImpl constructor - } - - @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - // TODO: implement PartialJoinOpImpl processing logic - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java deleted file mode 100644 index 0d6141e..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl.window; - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators.WindowOperator; -import org.apache.samza.operators.internal.WindowOutput; -import org.apache.samza.operators.impl.OperatorImpl; -import org.apache.samza.operators.impl.StateStoreImpl; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Default implementation class of a {@link WindowOperator} for a session window. - * - * @param <M> the type of input {@link Message} - * @param <RK> the type of window key - * @param <WS> the type of window state - * @param <RM> the type of aggregated value of the window - */ -public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends - OperatorImpl<M, RM> { - private final WindowOperator<M, RK, WS, RM> sessWnd; - private StateStoreImpl<M, RK, WS> wndStore = null; - - public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) { - this.sessWnd = sessWnd; - } - - @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - Entry<RK, WS> state = this.wndStore.getState(message); - this.nextProcessors(this.sessWnd.getFunction().apply(message, state), collector, coordinator); - this.wndStore.updateState(message, state); - } - - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - // This is to periodically check the timeout triggers to get the list of window states to be updated - } - - @Override protected void init(MessageStream<M> source, TaskContext context) { - this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), sessWnd.getStoreName(source)); - this.wndStore.init(context); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java new file mode 100644 index 0000000..8b75cdc --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.MessageEnvelope; + + +/** + * A stateless serializable stream operator specification that holds all the information required + * to transform the input {@link MessageStream} and produce the output {@link MessageStream}. + */ +public interface OperatorSpec<OM extends MessageEnvelope> { + + /** + * Get the output stream containing transformed {@link MessageEnvelope} produced by this operator. + * @return the output stream containing transformed {@link MessageEnvelope} produced by this operator. + */ + MessageStream<OM> getOutputStream(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java new file mode 100644 index 0000000..f622b34 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.operators.windows.WindowFn; +import org.apache.samza.operators.windows.WindowOutput; + +import java.util.ArrayList; +import java.util.UUID; +import java.util.function.BiFunction; + + +/** + * Factory methods for creating {@link OperatorSpec} instances. + */ +public class OperatorSpecs { + + private OperatorSpecs() {} + + private static String getOperatorId() { + // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts + return UUID.randomUUID().toString(); + } + + /** + * Creates a {@link StreamOperatorSpec}. + * + * @param transformFn the transformation function + * @param <M> type of input {@link MessageEnvelope} + * @param <OM> type of output {@link MessageEnvelope} + * @return the {@link StreamOperatorSpec} + */ + public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperator( + FlatMapFunction<M, OM> transformFn) { + return new StreamOperatorSpec<>(transformFn); + } + + /** + * Creates a {@link SinkOperatorSpec}. + * + * @param sinkFn the sink function + * @param <M> type of input {@link MessageEnvelope} + * @return the {@link SinkOperatorSpec} + */ + public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperator(SinkFunction<M> sinkFn) { + return new SinkOperatorSpec<>(sinkFn); + } + + /** + * Creates a {@link WindowOperatorSpec}. + * + * @param windowFn the {@link WindowFn} function + * @param <M> type of input {@link MessageEnvelope} + * @param <WK> type of window key + * @param <WS> type of {@link WindowState} + * @param <WM> type of output {@link WindowOutput} + * @return the {@link WindowOperatorSpec} + */ + public static <M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperatorSpec<M, WK, WS, WM> createWindowOperator( + WindowFn<M, WK, WS, WM> windowFn) { + return new WindowOperatorSpec<>(windowFn, OperatorSpecs.getOperatorId()); + } + + /** + * Creates a {@link PartialJoinOperatorSpec}. + * + * @param partialJoinFn the join function + * @param joinOutput the output {@link MessageStreamImpl} + * @param <M> type of input {@link MessageEnvelope} + * @param <K> type of join key + * @param <JM> the type of {@link MessageEnvelope} in the other join stream + * @param <OM> the type of {@link MessageEnvelope} in the join output + * @return the {@link PartialJoinOperatorSpec} + */ + public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperator( + BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) { + return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, OperatorSpecs.getOperatorId()); + } + + /** + * Creates a {@link StreamOperatorSpec} with a merger function. + * + * @param mergeOutput the output {@link MessageStreamImpl} from the merger + * @param <M> the type of input {@link MessageEnvelope} + * @return the {@link StreamOperatorSpec} for the merge + */ + public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperator(MessageStreamImpl<M> mergeOutput) { + return new StreamOperatorSpec<M, M>(t -> + new ArrayList<M>() { { + this.add(t); + } }, + mergeOutput); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java new file mode 100644 index 0000000..f74f35d --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.windows.StoreFunctions; + +import java.util.function.BiFunction; + + +/** + * Spec for the partial join operator that takes {@link MessageEnvelope}s from one input stream, joins with buffered + * {@link MessageEnvelope}s from another stream, and produces join results to an output {@link MessageStreamImpl}. + * + * @param <M> the type of input {@link MessageEnvelope} + * @param <K> the type of join key + * @param <JM> the type of {@link MessageEnvelope} in the other join stream + * @param <RM> the type of {@link MessageEnvelope} in the join output stream + */ +public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> + implements OperatorSpec<RM> { + + private final MessageStreamImpl<RM> joinOutput; + + /** + * The transformation function of {@link PartialJoinOperatorSpec} that takes an input {@link MessageEnvelope} of + * type {@code M}, joins with a stream of buffered {@link MessageEnvelope}s of type {@code JM} from another stream, + * and generates a joined result {@link MessageEnvelope} of type {@code RM}. + */ + private final BiFunction<M, JM, RM> transformFn; + + /** + * The {@link MessageEnvelope} store functions that read the buffered {@link MessageEnvelope}s from the other + * stream in the join. + */ + private final StoreFunctions<JM, K, JM> joinStoreFns; + + /** + * The {@link MessageEnvelope} store functions that save the buffered {@link MessageEnvelope} of this + * {@link MessageStreamImpl} in the join. + */ + private final StoreFunctions<M, K, M> selfStoreFns; + + /** + * The unique ID for this operator. + */ + private final String operatorId; + + /** + * Default constructor for a {@link PartialJoinOperatorSpec}. + * + * @param partialJoinFn partial join function that take type {@code M} of input {@link MessageEnvelope} and join + * w/ type {@code JM} of buffered {@link MessageEnvelope} from another stream + * @param joinOutput the output {@link MessageStreamImpl} of the join results + */ + PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, String operatorId) { + this.joinOutput = joinOutput; + this.transformFn = partialJoinFn; + // Read-only join store, no creator/updater functions required. + this.joinStoreFns = new StoreFunctions<>(m -> m.getKey(), null); + // Buffered message envelope store for this input stream. + this.selfStoreFns = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m); + this.operatorId = operatorId; + } + + @Override + public String toString() { + return this.operatorId; + } + + @Override + public MessageStreamImpl<RM> getOutputStream() { + return this.joinOutput; + } + + public StoreFunctions<JM, K, JM> getJoinStoreFns() { + return this.joinStoreFns; + } + + public StoreFunctions<M, K, M> getSelfStoreFns() { + return this.selfStoreFns; + } + + public BiFunction<M, JM, RM> getTransformFn() { + return this.transformFn; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java new file mode 100644 index 0000000..4348bc0 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.MessageStreamImpl; + + +/** + * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external + * system. This is a terminal operator and does allows further operator chaining. + * + * @param <M> the type of input {@link MessageEnvelope} + */ +public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec { + + /** + * The user-defined sink function + */ + private final SinkFunction<M> sinkFn; + + /** + * Default constructor for a {@link SinkOperatorSpec}. + * + * @param sinkFn a user defined {@link SinkFunction} that will be called with the output {@link MessageEnvelope}, + * the output {@link org.apache.samza.task.MessageCollector} and the + * {@link org.apache.samza.task.TaskCoordinator}. + */ + SinkOperatorSpec(SinkFunction<M> sinkFn) { + this.sinkFn = sinkFn; + } + + /** + * This is a terminal operator and doesn't allow further operator chaining. + * @return null + */ + @Override + public MessageStreamImpl getOutputStream() { + return null; + } + + public SinkFunction<M> getSinkFn() { + return this.sinkFn; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java new file mode 100644 index 0000000..ed18da4 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.MessageStreamImpl; + + +/** + * The spec for a linear stream operator that outputs 0 or more {@link MessageEnvelope}s for each input {@link MessageEnvelope}. + * + * @param <M> the type of input {@link MessageEnvelope} + * @param <OM> the type of output {@link MessageEnvelope} + */ +public class StreamOperatorSpec<M extends MessageEnvelope, OM extends MessageEnvelope> implements OperatorSpec<OM> { + + private final MessageStreamImpl<OM> outputStream; + + private final FlatMapFunction<M, OM> transformFn; + + /** + * Default constructor for a {@link StreamOperatorSpec}. + * + * @param transformFn the transformation function that transforms each input {@link MessageEnvelope} into a collection + * of output {@link MessageEnvelope}s + */ + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn) { + this(transformFn, new MessageStreamImpl<>()); + } + + /** + * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}. + * + * @param transformFn the transformation function + * @param outputStream the output {@link MessageStreamImpl} + */ + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> outputStream) { + this.outputStream = outputStream; + this.transformFn = transformFn; + } + + @Override + public MessageStreamImpl<OM> getOutputStream() { + return this.outputStream; + } + + public FlatMapFunction<M, OM> getTransformFn() { + return this.transformFn; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java new file mode 100644 index 0000000..2f5b1e7 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.windows.StoreFunctions; +import org.apache.samza.operators.windows.Trigger; +import org.apache.samza.operators.windows.WindowFn; +import org.apache.samza.operators.windows.WindowOutput; +import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.storage.kv.Entry; + +import java.util.function.BiFunction; + + +/** + * Defines a window operator that takes one {@link MessageStreamImpl} as an input, accumulates the window state, + * and generates an output {@link MessageStreamImpl} with output type {@code WM} which extends {@link WindowOutput} + * + * @param <M> the type of input {@link MessageEnvelope} + * @param <WK> the type of key in the output {@link MessageEnvelope} from the {@link WindowOperatorSpec} function + * @param <WS> the type of window state in the {@link WindowOperatorSpec} function + * @param <WM> the type of window output {@link MessageEnvelope} + */ +public class WindowOperatorSpec<M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements + OperatorSpec<WM> { + + /** + * The output {@link MessageStream}. + */ + private final MessageStreamImpl<WM> outputStream; + + /** + * The window transformation function that takes {@link MessageEnvelope}s from one input stream, aggregates with the window + * state(s) from the window state store, and generate output {@link MessageEnvelope}s for the output stream. + */ + private final BiFunction<M, Entry<WK, WS>, WM> transformFn; + + /** + * The state store functions for the {@link WindowOperatorSpec}. + */ + private final StoreFunctions<M, WK, WS> storeFns; + + /** + * The window trigger. + */ + private final Trigger<M, WS> trigger; + + /** + * The unique ID of this operator. + */ + private final String operatorId; + + /** + * Constructor for {@link WindowOperatorSpec}. + * + * @param windowFn the window function + * @param operatorId auto-generated unique ID of this operator + */ + WindowOperatorSpec(WindowFn<M, WK, WS, WM> windowFn, String operatorId) { + this.outputStream = new MessageStreamImpl<>(); + this.transformFn = windowFn.getTransformFn(); + this.storeFns = windowFn.getStoreFns(); + this.trigger = windowFn.getTrigger(); + this.operatorId = operatorId; + } + + @Override + public String toString() { + return this.operatorId; + } + + @Override + public MessageStreamImpl<WM> getOutputStream() { + return this.outputStream; + } + + public StoreFunctions<M, WK, WS> getStoreFns() { + return this.storeFns; + } + + public BiFunction<M, Entry<WK, WS>, WM> getTransformFn() { + return this.transformFn; + } + + public Trigger<M, WS> getTrigger() { + return this.trigger; + } + + /** + * Method to generate the window operator's state store name + * TODO HIGH pmaheshw: should this be here? + * + * @param inputStream the input {@link MessageStreamImpl} to this state store + * @return the persistent store name of the window operator + */ + public String getStoreName(MessageStream<M> inputStream) { + //TODO: need to get the persistent name of ds and the operator in a serialized form + return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java deleted file mode 100644 index c2f780d..0000000 --- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.task; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreams; -import org.apache.samza.operators.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.data.IncomingSystemMessage; -import org.apache.samza.operators.impl.ChainedOperators; -import org.apache.samza.operators.task.StreamOperatorTask; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.HashMap; -import java.util.Map; - - -/** - * An adaptor task class that invoke the user-implemented (@link StreamOperatorTask}s via {@link org.apache.samza.operators.MessageStream} programming APIs - * - */ -public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask { - /** - * A map with entries mapping {@link SystemStreamPartition} to {@link org.apache.samza.operators.impl.ChainedOperators} that takes the {@link SystemStreamPartition} - * as the input stream - */ - private final Map<SystemStreamPartition, ChainedOperators> operatorChains = new HashMap<>(); - - /** - * Wrapped {@link StreamOperatorTask} class - */ - private final StreamOperatorTask userTask; - - /** - * Constructor that wraps the user-defined {@link StreamOperatorTask} - * - * @param userTask the user-defined {@link StreamOperatorTask} - */ - public StreamOperatorAdaptorTask(StreamOperatorTask userTask) { - this.userTask = userTask; - } - - @Override - public final void init(Config config, TaskContext context) throws Exception { - if (this.userTask instanceof InitableTask) { - ((InitableTask) this.userTask).init(config, context); - } - Map<SystemStreamPartition, SystemMessageStream> sources = new HashMap<>(); - context.getSystemStreamPartitions().forEach(ssp -> { - SystemMessageStream ds = MessageStreams.input(ssp); - sources.put(ssp, ds); - }); - this.userTask.initOperators(sources.values()); - sources.forEach((ssp, ds) -> operatorChains.put(ssp, ChainedOperators.create(ds, context))); - } - - @Override - public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { - this.operatorChains.get(ime.getSystemStreamPartition()).onNext(new IncomingSystemMessage(ime), collector, coordinator); - } - - @Override - public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - this.operatorChains.forEach((ssp, chain) -> chain.onTimer(collector, coordinator)); - if (this.userTask instanceof WindowableTask) { - ((WindowableTask) this.userTask).window(collector, coordinator); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java new file mode 100644 index 0000000..e45d068 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + + +import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.windows.TriggerBuilder; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collection; +import java.util.Map; + + +/** + * Example implementation of split stream tasks + * + */ +public class BroadcastTask implements StreamOperatorTask { + class MessageType { + String field1; + String field2; + String field3; + String field4; + String parKey; + private long timestamp; + + public long getTimestamp() { + return this.timestamp; + } + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) { + messageStreams.values().forEach(entry -> { + MessageStream<JsonMessageEnvelope> inputStream = entry.map(this::getInputMessage); + + inputStream.filter(this::myFilter1). + window(Windows.<JsonMessageEnvelope, String>intoSessionCounter( + m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). + setTriggers(TriggerBuilder.<JsonMessageEnvelope, Integer>earlyTriggerWhenExceedWndLen(100). + addLateTriggerOnSizeLimit(10). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter2). + window(Windows.<JsonMessageEnvelope, String>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)). + setTriggers(TriggerBuilder.<JsonMessageEnvelope, Collection<JsonMessageEnvelope>>earlyTriggerWhenExceedWndLen(100). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter3). + window(Windows.<JsonMessageEnvelope, String, MessageType>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()). + setTriggers(TriggerBuilder.<JsonMessageEnvelope, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getMessage().getTimestamp(), 30000). + addTimeoutSinceFirstMessage(60000))); + }); + } + + JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope m1) { + return (JsonMessageEnvelope) m1.getMessage(); + } + + boolean myFilter1(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key1"); + } + + boolean myFilter2(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key2"); + } + + boolean myFilter3(JsonMessageEnvelope m1) { + return m1.getMessage().parKey.equals("key3"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java new file mode 100644 index 0000000..1b10609 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + +import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +/** + * Example implementation of unique key-based stream-stream join tasks + * + */ +public class JoinTask implements StreamOperatorTask { + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + MessageStream<JsonMessageEnvelope> joinOutput = null; + + @Override + public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) { + messageStreams.values().forEach(messageStream -> { + MessageStream<JsonMessageEnvelope> newSource = messageStream.map(this::getInputMessage); + if (joinOutput == null) { + joinOutput = newSource; + } else { + joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2)); + } + }); + } + + private JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope ism) { + return new JsonMessageEnvelope( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getSystemStreamPartition()); + } + + JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java new file mode 100644 index 0000000..61bb32a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.TaskContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestFluentStreamAdaptorTask { + Field userTaskField = null; + Field operatorChainsField = null; + + @Before + public void prep() throws NoSuchFieldException { + userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask"); + operatorChainsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + userTaskField.setAccessible(true); + operatorChainsField.setAccessible(true); + } + + @Test + public void testConstructor() throws IllegalAccessException { + StreamOperatorTask userTask = mock(StreamOperatorTask.class); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); + StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask); + Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask); + assertEquals(taskMemberVar, userTask); + assertTrue(chainsMap.isEmpty()); + } + + @Test + public void testInit() throws Exception { + StreamOperatorTask userTask = mock(StreamOperatorTask.class); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + Set<SystemStreamPartition> testInputs = new HashSet() { { + this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0))); + this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1))); + } }; + when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs); + adaptorTask.init(mockConfig, mockContext); + verify(userTask, times(1)).transform(Mockito.anyMap()); + Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask); + assertTrue(chainsMap.size() == 2); + assertTrue(chainsMap.containsKey(testInputs.toArray()[0])); + assertTrue(chainsMap.containsKey(testInputs.toArray()[1])); + } + + // TODO: window and process methods to be added after implementation of ChainedOperators.create() +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java new file mode 100644 index 0000000..d804bf8 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; + +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit test for {@link StreamOperatorTask} + */ +public class TestFluentStreamTasks { + + private final WindowTask userTask = new WindowTask(); + + private final BroadcastTask splitTask = new BroadcastTask(); + + private final JoinTask joinTask = new JoinTask(); + + private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { + for (int i = 0; i < 4; i++) { + this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i))); + } + } }; + + @Test + public void testUserTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, OperatorImpl> pipelineMap = + (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + @Test + public void testSplitTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, OperatorImpl> pipelineMap = + (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + @Test + public void testJoinTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, OperatorImpl> pipelineMap = + (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + +}
