Repository: samza Updated Branches: refs/heads/samza-sql fbdd76daa -> adcd26678
SAMZA-915: implementation of operator classes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adcd2667 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adcd2667 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adcd2667 Branch: refs/heads/samza-sql Commit: adcd26678d3fb6ea632de7117bc40a9c4f343d59 Parents: fbdd76d Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Fri Oct 21 21:27:28 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Oct 21 21:31:47 2016 -0700 ---------------------------------------------------------------------- .../samza/operators/api/MessageStream.java | 50 ++++-- .../samza/operators/api/internal/Operators.java | 2 +- .../samza/operators/impl/ChainedOperators.java | 52 +++++- .../samza/operators/impl/OperatorFactory.java | 86 +++++++++ .../samza/operators/impl/OperatorImpl.java | 15 +- .../operators/impl/join/PartialJoinOpImpl.java | 44 +++++ .../impl/window/SessionWindowImpl.java | 17 +- .../samza/operators/api/TestMessageStream.java | 180 +++++++++++++++++++ .../samza/operators/api/TestOutputMessage.java | 47 +++++ .../operators/api/internal/TestOperators.java | 1 - .../operators/impl/TestChainedOperators.java | 129 +++++++++++++ .../operators/impl/TestOperatorFactory.java | 95 ++++++++++ .../samza/operators/impl/TestOperatorImpl.java | 1 + .../samza/operators/impl/TestOutputMessage.java | 47 ----- .../operators/impl/TestSimpleOperatorImpl.java | 1 + .../operators/impl/TestSinkOperatorImpl.java | 1 + .../impl/data/serializers/SqlAvroSerdeTest.java | 1 + .../impl/window/TestSessionWindowImpl.java | 53 ++---- .../samza/task/BroadcastOperatorTask.java | 41 +++-- .../samza/task/InputJsonSystemMessage.java | 2 +- 20 files changed, 729 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java index a01cee9..b5e1028 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java @@ -19,16 +19,20 @@ package org.apache.samza.operators.api; +import org.apache.samza.operators.api.Windows.Window; import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.WindowOutput; import org.apache.samza.operators.api.internal.Operators; -import org.apache.samza.operators.api.Windows.Window; +import org.apache.samza.operators.api.internal.Operators.Operator; +import org.apache.samza.operators.api.internal.WindowOutput; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; @@ -41,6 +45,19 @@ import java.util.function.Function; */ public class MessageStream<M extends Message> { + private final Set<Operator> subscribers = new HashSet<>(); + + /** + * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}. + * + * NOTE: This should only be used by implementation of {@link org.apache.samza.operators.impl.ChainedOperators}, not directly by programmers. + * + * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object + */ + public Collection<Operator> getSubscribers() { + return Collections.unmodifiableSet(this.subscribers); + } + /** * Public API methods start here */ @@ -65,12 +82,14 @@ public class MessageStream<M extends Message> { * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} */ public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) { - return Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{ + Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{ OM r = mapper.apply(m); if (r != null) { this.add(r); } - }}).getOutputStream(); + }}); + this.subscribers.add(op); + return op.getOutputStream(); } /** @@ -81,7 +100,9 @@ public class MessageStream<M extends Message> { * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} */ public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) { - return Operators.getStreamOperator(flatMapper).getOutputStream(); + Operator<OM> op = Operators.getStreamOperator(flatMapper); + this.subscribers.add(op); + return op.getOutputStream(); } /** @@ -91,11 +112,13 @@ public class MessageStream<M extends Message> { * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream} */ public MessageStream<M> filter(Function<M, Boolean> filter) { - return Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{ + Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{ if (filter.apply(t)) { this.add(t); } - }}).getOutputStream(); + }}); + this.subscribers.add(op); + return op.getOutputStream(); } /** @@ -105,7 +128,7 @@ public class MessageStream<M extends Message> { * @param sink the user-defined sink function to send the input {@link Message}s to the external output systems */ public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { - Operators.getSinkOperator(sink); + this.subscribers.add(Operators.getSinkOperator(sink)); } /** @@ -119,7 +142,9 @@ public class MessageStream<M extends Message> { * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream} */ public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) { - return Operators.getWindowOperator(Windows.getInternalWindowFn(window)).getOutputStream(); + Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window)); + this.subscribers.add(wndOp); + return wndOp.getOutputStream(); } /** @@ -141,8 +166,8 @@ public class MessageStream<M extends Message> { // TODO: need to add default store functions for the two partial join functions - Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream); - Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream); + other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream)); + this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream)); return outputStream; } @@ -156,7 +181,8 @@ public class MessageStream<M extends Message> { MessageStream<M> outputStream = new MessageStream<>(); others.add(this); - others.forEach(other -> Operators.getMergeOperator(outputStream)); + others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream))); return outputStream; } + } http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java index f220285..e9bfe0b 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java @@ -55,7 +55,7 @@ public class Operators { * Private interface for stream operator functions. The interface class defines the output of the stream operator function. * */ - private interface Operator<OM extends Message> { + public interface Operator<OM extends Message> { MessageStream<OM> getOutputStream(); } http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java index 49cfdeb..59de16b 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java @@ -21,10 +21,16 @@ package org.apache.samza.operators.impl; import org.apache.samza.operators.api.MessageStream; import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.Operator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + /** * Implementation class for a chain of operators from the single input {@code source} @@ -33,6 +39,8 @@ import org.apache.samza.task.TaskCoordinator; */ public class ChainedOperators<M extends Message> { + private final Set<OperatorImpl> subscribers = new HashSet<>(); + /** * Private constructor * @@ -41,7 +49,38 @@ public class ChainedOperators<M extends Message> { */ private ChainedOperators(MessageStream<M> source, TaskContext context) { // create the pipeline/topology starting from source - // pass in the context s.t. stateful stream operators can initialize their stores + source.getSubscribers().forEach(sub -> { + // pass in the context s.t. stateful stream operators can initialize their stores + OperatorImpl subImpl = this.createAndSubscribe(sub, source, context); + this.subscribers.add(subImpl); + }); + } + + /** + * Private function to recursively instantiate the implementation of operators and the chains + * + * @param operator the operator that subscribe to {@code source} + * @param source the source {@link MessageStream} + * @param context the context of the task + * @return the implementation object of the corresponding {@code operator} + */ + private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source, + TaskContext context) { + Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator); + if (factoryEntry.getValue()) { + // The operator has already been instantiated and we do not need to traverse and create the subscribers any more. + return factoryEntry.getKey(); + } + OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey(); + MessageStream outStream = operator.getOutputStream(); + Collection<Operator> subs = outStream.getSubscribers(); + subs.forEach(sub -> { + OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context); + opImpl.subscribe(subImpl); + }); + // initialize the operator's state store + opImpl.init(source, context); + return opImpl; } /** @@ -64,10 +103,17 @@ public class ChainedOperators<M extends Message> { * @param coordinator the {@link TaskCoordinator} object within the process context */ public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - // TODO: add implementation of onNext() that actually triggers the process pipeline + this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator)); } + /** + * Method to handle timer events + * + * @param collector the {@link MessageCollector} object within the process context + * @param coordinator the {@link TaskCoordinator} object within the process context + */ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - // TODO: add implementation of onTimer() that actually calls the corresponding window operator's onTimer() methods + long nanoTime = System.nanoTime(); + this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java new file mode 100644 index 0000000..f16cbc6 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.commons.collections.keyvalue.AbstractMapEntry; +import org.apache.samza.operators.api.WindowState; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.*; +import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.operators.impl.join.PartialJoinOpImpl; +import org.apache.samza.operators.impl.window.SessionWindowImpl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Map.Entry; + + +/** + * The factory class that instantiates all implementation of {@link OperatorImpl} classes. + */ +public class OperatorFactory { + + /** + * the static operatorMap that includes all operator implementation instances + */ + private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> operatorMap = new ConcurrentHashMap<>(); + + /** + * The method to actually create the implementation instances of operators + * + * @param operator the immutable definition of {@link Operator} + * @param <M> type of input {@link Message} + * @param <RM> type of output {@link Message} + * @return the implementation object of {@link OperatorImpl} + */ + private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) { + if (operator instanceof StreamOperator) { + return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator); + } else if (operator instanceof SinkOperator) { + return new SinkOperatorImpl<>((SinkOperator<M>) operator); + } else if (operator instanceof WindowOperator) { + return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator); + } else if (operator instanceof PartialJoinOperator) { + return new PartialJoinOpImpl<>((PartialJoinOperator) operator); + } + throw new IllegalArgumentException( + String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName())); + } + + /** + * The method to get the unique implementation instance of {@link Operator} + * + * @param operator the {@link Operator} to instantiate + * @param <M> type of input {@link Message} + * @param <RM> type of output {@link Message} + * @return A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether + * the operator instance has already been created or not. True means the operator instance has already created, false means the operator + * was not created. + */ + public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) { + if (!operatorMap.containsKey(operator)) { + OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator); + if( operatorMap.putIfAbsent(operator, operatorImpl) == null ) { + return new AbstractMapEntry(operatorImpl, false) {}; + } + } + return new AbstractMapEntry((OperatorImpl<M, ? extends Message>) operatorMap.get(operator), true) {}; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 81a7ede..3ca8bde 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.MessageStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -61,6 +62,17 @@ public abstract class OperatorImpl<M extends Message, RM extends Message> } /** + * Default method for timer event + * + * @param nanoTime the system nano-second when the timer event is triggered + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + public void onTimer(long nanoTime, MessageCollector collector, TaskCoordinator coordinator) { + this.subscribers.forEach(sub -> ((OperatorImpl)sub).onTimer(nanoTime, collector, coordinator)); + } + + /** * Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers. * * @param message the input {@link Message} @@ -72,9 +84,10 @@ public abstract class OperatorImpl<M extends Message, RM extends Message> /** * Stateful operators will need to override this method to initialize the operators * + * @param source the source that this {@link OperatorImpl} object subscribe to * @param context the task context to initialize the operators within */ - protected void init(TaskContext context) {}; + protected void init(MessageStream<M> source, TaskContext context) {}; /** * Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream} http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java new file mode 100644 index 0000000..bbe08a4 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.join; + +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation of a {@link org.apache.samza.operators.api.internal.Operators.PartialJoinOperator}. This class implements function + * that only takes in one input stream among all inputs to the join and generate the join output. + * + * @param <M> Type of input stream {@link org.apache.samza.operators.api.data.Message} + * @param <RM> Type of join output stream {@link org.apache.samza.operators.api.data.Message} + */ +public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> { + + public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) { + // TODO: implement PartialJoinOpImpl constructor + } + + @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // TODO: implement PartialJoinOpImpl processing logic + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java index 2de53aa..59e2dec 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java @@ -30,29 +30,27 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; -import java.util.function.BiFunction; - /** * Default implementation class of a {@link WindowOperator} for a session window. * * @param <M> the type of input {@link Message} * @param <RK> the type of window key + * @param <WS> the type of window state * @param <RM> the type of aggregated value of the window */ public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends OperatorImpl<M, RM> { - private final BiFunction<M, Entry<RK, WS>, RM> txfmFunction; - private final StateStoreImpl<M, RK, WS> wndStore; + private final WindowOperator<M, RK, WS, RM> sessWnd; + private StateStoreImpl<M, RK, WS> wndStore = null; - SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd, MessageStream<M> input) { - this.txfmFunction = sessWnd.getFunction(); - this.wndStore = new StateStoreImpl<>(sessWnd.getStoreFunctions(), sessWnd.getStoreName(input)); + public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) { + this.sessWnd = sessWnd; } @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { Entry<RK, WS> state = this.wndStore.getState(message); - this.nextProcessors(this.txfmFunction.apply(message, state), collector, coordinator); + this.nextProcessors(this.sessWnd.getFunction().apply(message, state), collector, coordinator); this.wndStore.updateState(message, state); } @@ -60,7 +58,8 @@ public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM // This is to periodically check the timeout triggers to get the list of window states to be updated } - @Override protected void init(TaskContext context) { + @Override protected void init(MessageStream<M> source, TaskContext context) { + this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), sessWnd.getStoreName(source)); this.wndStore.init(context); } } http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java new file mode 100644 index 0000000..9f9ad6b --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api; + +import org.apache.samza.operators.api.internal.Operators.*; +import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestMessageStream { + + @Test public void testMap() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2); + MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestOutputMessage> mapOp = subs.iterator().next(); + assertTrue(mapOp instanceof StreamOperator); + assertEquals(mapOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + TestMessage xTestMsg = mock(TestMessage.class); + when(xTestMsg.getKey()).thenReturn("test-msg-key"); + when(xTestMsg.getMessage()).thenReturn("123456789"); + when(xTestMsg.getTimestamp()).thenReturn(12345L); + Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg); + assertEquals(cOutputMsg.size(), 1); + TestOutputMessage outputMessage = cOutputMsg.iterator().next(); + assertEquals(outputMessage.getKey(), xTestMsg.getKey()); + assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1)); + assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2); + } + + @Test public void testFlatMap() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() {{ + this.add(mock(TestOutputMessage.class)); + this.add(mock(TestOutputMessage.class)); + this.add(mock(TestOutputMessage.class)); + }}; + Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts; + MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestOutputMessage> flatMapOp = subs.iterator().next(); + assertTrue(flatMapOp instanceof StreamOperator); + assertEquals(flatMapOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap); + } + + @Test public void testFilter() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L; + MessageStream<TestMessage> outputStream = inputStream.filter(xFilter); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> filterOp = subs.iterator().next(); + assertTrue(filterOp instanceof StreamOperator); + assertEquals(filterOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction(); + TestMessage mockMsg = mock(TestMessage.class); + when(mockMsg.getTimestamp()).thenReturn(11111L); + Collection<TestMessage> output = txfmFn.apply(mockMsg); + assertTrue(output.isEmpty()); + when(mockMsg.getTimestamp()).thenReturn(999999L); + output = txfmFn.apply(mockMsg); + assertEquals(output.size(), 1); + assertEquals(output.iterator().next(), mockMsg); + } + + @Test public void testSink() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> { + mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); + tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + }; + inputStream.sink(xSink); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> sinkOp = subs.iterator().next(); + assertTrue(sinkOp instanceof SinkOperator); + assertEquals(((SinkOperator) sinkOp).getFunction(), xSink); + assertNull(((SinkOperator) sinkOp).getOutputStream()); + } + + @Test public void testWindow() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class); + MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> wndOp = subs.iterator().next(); + assertTrue(wndOp instanceof WindowOperator); + assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream); + } + + @Test public void testJoin() { + MessageStream<TestMessage> source1 = new MessageStream<>(); + MessageStream<TestMessage> source2 = new MessageStream<>(); + BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp()); + MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner); + Collection<Operator> subs = source1.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> joinOp1 = subs.iterator().next(); + assertTrue(joinOp1 instanceof PartialJoinOperator); + assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput); + subs = source2.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> joinOp2 = subs.iterator().next(); + assertTrue(joinOp2 instanceof PartialJoinOperator); + assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput); + TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L); + TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L); + TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + assertEquals(xOut.getTimestamp(), 11111L); + xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + assertEquals(xOut.getTimestamp(), 11111L); + } + + @Test public void testMerge() { + MessageStream<TestMessage> merge1 = new MessageStream<>(); + Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>(){{ + this.add(new MessageStream<>()); + this.add(new MessageStream<>()); + }}; + MessageStream<TestMessage> mergeOutput = merge1.merge(others); + validateMergeOperator(merge1, mergeOutput); + + others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); + } + + private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) { + Collection<Operator> subs = mergeSource.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> mergeOp = subs.iterator().next(); + assertTrue(mergeOp instanceof StreamOperator); + assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput); + TestMessage mockMsg = mock(TestMessage.class); + Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg); + assertEquals(outputs.size(), 1); + assertEquals(outputs.iterator().next(), mockMsg); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java new file mode 100644 index 0000000..225e77f --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api; + +import org.apache.samza.operators.api.data.Message; + + +public class TestOutputMessage implements Message<String, Integer> { + private final String key; + private final Integer value; + private final long timestamp; + + public TestOutputMessage(String key, Integer value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public Integer getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java index 65c37e9..6dc77e5 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java @@ -117,7 +117,6 @@ public class TestOperators { @Test public void testGetMergeOperator() { MessageStream<TestMessage> output = new MessageStream<>(); - Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output); Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() {{ this.add(t); http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java new file mode 100644 index 0000000..d4d2378 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.TestOutputMessage; +import org.apache.samza.operators.api.Windows; +import org.apache.samza.task.TaskContext; +import org.junit.Before; +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + + +public class TestChainedOperators { + Field subsField = null; + Field opSubsField = null; + + @Before public void prep() throws NoSuchFieldException { + subsField = ChainedOperators.class.getDeclaredField("subscribers"); + subsField.setAccessible(true); + opSubsField = OperatorImpl.class.getDeclaredField("subscribers"); + opSubsField.setAccessible(true); + } + + @Test public void testCreate() { + // test creation of empty chain + MessageStream<TestMessage> testStream = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testStream, mockContext); + assertTrue(operatorChain != null); + } + + @Test public void testLinearChain() throws IllegalAccessException { + // test creation of linear chain + MessageStream<TestMessage> testInput = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessage::getKey)); + ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext); + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessage, TestMessage> firstOpImpl = subsSet.iterator().next(); + Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(firstOpImpl); + assertEquals(subsOps.size(), 1); + Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = subsOps.iterator().next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(wndOpImpl); + assertEquals(subsOps.size(), 0); + } + + @Test public void testBroadcastChain() throws IllegalAccessException { + // test creation of broadcast chain + MessageStream<TestMessage> testInput = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new ArrayList() {{ this.add(m); this.add(m); }}); + testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m); + ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext); + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain); + assertEquals(subsSet.size(), 2); + Iterator<OperatorImpl> iter = subsSet.iterator(); + // check the first branch w/ flatMap + OperatorImpl<TestMessage, TestMessage> opImpl = iter.next(); + Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl); + assertEquals(subsOps.size(), 1); + Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = subsOps.iterator().next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(flatMapImpl); + assertEquals(subsOps.size(), 0); + // check the second branch w/ map + opImpl = iter.next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl); + assertEquals(subsOps.size(), 1); + Subscriber<? super ProcessorContext<TestMessage>> mapImpl = subsOps.iterator().next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(mapImpl); + assertEquals(subsOps.size(), 0); + } + + @Test public void testJoinChain() throws IllegalAccessException { + // test creation of join chain + MessageStream<TestMessage> input1 = new MessageStream<>(); + MessageStream<TestMessage> input2 = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m -> m); + // now, we create chained operators from each input sources + ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, mockContext); + ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, mockContext); + // check that those two chains will merge at map operator + // first branch of the join + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = subsSet.iterator().next(); + Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp1); + assertEquals(subsOps.size(), 1); + // the map operator consumes the common join output, where two branches merge + Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = subsOps.iterator().next(); + // second branch of the join + subsSet = (Set<OperatorImpl>) subsField.get(chain2); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = subsSet.iterator().next(); + assertNotSame(joinOp1, joinOp2); + subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp2); + assertEquals(subsOps.size(), 1); + // make sure that the map operator is the same + assertEquals(mapImpl, subsOps.iterator().next()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java new file mode 100644 index 0000000..d228784 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.TestOutputMessage; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator; +import org.apache.samza.operators.api.internal.Operators.SinkOperator; +import org.apache.samza.operators.api.internal.Operators.StreamOperator; +import org.apache.samza.operators.api.internal.Operators.WindowOperator; +import org.apache.samza.operators.impl.join.PartialJoinOpImpl; +import org.apache.samza.operators.impl.window.SessionWindowImpl; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestOperatorFactory { + + @Test public void testGetOperator() throws NoSuchFieldException, IllegalAccessException { + // get window operator + WindowOperator mockWnd = mock(WindowOperator.class); + Map.Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean> + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockWnd); + assertFalse(factoryEntry.getValue()); + OperatorImpl<TestMessage, TestOutputMessage> opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof SessionWindowImpl); + Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd"); + sessWndField.setAccessible(true); + WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl); + assertEquals(sessWnd, mockWnd); + + // get simple operator + StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = mock(StreamOperator.class); + Function<TestMessage, Collection<TestOutputMessage>> mockTxfmFn = mock(Function.class); + when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn); + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockSimpleOp); + opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof SimpleOperatorImpl); + Field txfmFnField = SimpleOperatorImpl.class.getDeclaredField("transformFn"); + txfmFnField.setAccessible(true); + assertEquals(mockTxfmFn, txfmFnField.get(opImpl)); + + // get sink operator + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, mc, tc) -> {}; + SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class); + when(sinkOp.getFunction()).thenReturn(sinkFn); + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(sinkOp); + opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof SinkOperatorImpl); + Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc"); + sinkFnField.setAccessible(true); + assertEquals(sinkFn, sinkFnField.get(opImpl)); + + // get join operator + PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> joinOp = mock(PartialJoinOperator.class); + TestOutputMessage mockOutput = mock(TestOutputMessage.class); + BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) -> mockOutput; + when(joinOp.getFunction()).thenReturn(joinFn); + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(joinOp); + opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof PartialJoinOpImpl); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 9445f3a..d296111 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.TestOutputMessage; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java deleted file mode 100644 index 4bcf767..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.operators.api.data.Message; - - -class TestOutputMessage implements Message<String, Integer> { - private final String key; - private final Integer value; - private final long timestamp; - - TestOutputMessage(String key, Integer value, long timestamp) { - this.key = key; - this.value = value; - this.timestamp = timestamp; - } - - @Override public Integer getMessage() { - return this.value; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java index 50154f0..c8c4944 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.TestOutputMessage; import org.apache.samza.operators.api.internal.Operators.StreamOperator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index eb8a23a..e711bc5 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.TestOutputMessage; import org.apache.samza.operators.api.internal.Operators.SinkOperator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java index 10ee2c7..5aa28bb 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java @@ -64,6 +64,7 @@ public class SqlAvroSerdeTest { @Test public void testSqlAvroSerialization() throws IOException { AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); + @SuppressWarnings("unchecked") byte[] encodedDatum = serde.toBytes(decodedDatumOriginal); AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum); http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java index 6947464..75cb00c 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java @@ -24,7 +24,6 @@ import org.apache.samza.operators.api.WindowState; import org.apache.samza.operators.api.internal.Operators.StoreFunctions; import org.apache.samza.operators.api.internal.Operators.WindowOperator; import org.apache.samza.operators.api.internal.WindowOutput; -import org.apache.samza.operators.impl.StateStoreImpl; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; @@ -44,67 +43,41 @@ import static org.mockito.Mockito.*; public class TestSessionWindowImpl { Field wndStoreField = null; - Field txfmFnField = null; + Field sessWndField = null; @Before public void prep() throws NoSuchFieldException { wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore"); - txfmFnField = SessionWindowImpl.class.getDeclaredField("txfmFunction"); + sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd"); wndStoreField.setAccessible(true); - txfmFnField.setAccessible(true); + sessWndField.setAccessible(true); } @Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException { // test constructing a SessionWindowImpl w/ expected mock functions - MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class); WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class); - StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class); - when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns); - when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store"); - BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class); - when(wndOp.getFunction()).thenReturn(mockTxfmFn); - SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm); - BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>> txfmFn = - (BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>>) txfmFnField.get(sessWnd); - assertEquals(mockTxfmFn, txfmFn); - StateStoreImpl<TestMessage, String, WindowState<Integer>> storeImpl = - (StateStoreImpl<TestMessage, String, WindowState<Integer>>) wndStoreField.get(sessWnd); - - // test init() and make sure the wndStore is initialized as expected - TestMessage mockMsg = mock(TestMessage.class); - TaskContext mockContext = mock(TaskContext.class); - KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class); - when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore); - Function<TestMessage, String> wndKeyFn = m -> "test-msg-key"; - when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn); - WindowState<Integer> mockState = mock(WindowState.class); - when(mockKvStore.get("test-msg-key")).thenReturn(mockState); - storeImpl.init(mockContext); - Entry<String, WindowState<Integer>> stateEntry = storeImpl.getState(mockMsg); - verify(mockStoreFns, times(1)).getStoreKeyFinder(); - verify(mockKvStore, times(1)).get("test-msg-key"); - assertEquals(stateEntry.getKey(), "test-msg-key"); - assertEquals(stateEntry.getValue(), mockState); + SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp); + assertEquals(wndOp, sessWndField.get(sessWnd)); } - @Test public void testInitAndProcess() { - MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class); + @Test public void testInitAndProcess() throws IllegalAccessException { WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class); + SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp); + + // construct and init the SessionWindowImpl object + MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class); StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class); Function<TestMessage, String> wndKeyFn = m -> "test-msg-key"; when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn); when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns); when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store"); - BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class); when(wndOp.getFunction()).thenReturn(mockTxfmFn); - - // construct and init the SessionWindowImpl object - SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm); TaskContext mockContext = mock(TaskContext.class); KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class); when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore); - sessWnd.init(mockContext); + sessWnd.init(mockInputStrm, mockContext); - // test onNext() method. Make sure the right methods are invoked. + // test onNext() method. Make sure the transformation function and the state update functions are invoked. TestMessage mockMsg = mock(TestMessage.class); MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java index 91b0074..724bbba 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java @@ -56,27 +56,26 @@ public class BroadcastOperatorTask implements StreamOperatorTask { @Override public void initOperators(Collection<SystemMessageStream> sources) { sources.forEach(source -> { - MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage); - - inputStream.filter(this::myFilter1). - window(Windows.<JsonMessage, String>intoSessionCounter( - m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). - setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). - addLateTriggerOnSizeLimit(10). - addTimeoutSinceLastMessage(30000))); - - inputStream.filter(this::myFilter2). - window(Windows.<JsonMessage, String>intoSessions( - m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)). - setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100). - addTimeoutSinceLastMessage(30000))); - - inputStream.filter(this::myFilter3). - window(Windows.<JsonMessage, String, MessageType>intoSessions( - m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()). - setTriggers(TriggerBuilder - .<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000). - addTimeoutSinceFirstMessage(60000))); + MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage); + + inputStream.filter(this::myFilter1). + window(Windows.<JsonMessage, String>intoSessionCounter( + m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). + setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). + addLateTriggerOnSizeLimit(10). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter2). + window(Windows.<JsonMessage, String>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)). + setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter3). + window(Windows.<JsonMessage, String, MessageType>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()). + setTriggers(TriggerBuilder.<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000). + addTimeoutSinceFirstMessage(60000))); } ); } http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java index 5e710b2..33ae9c9 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java +++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java @@ -25,7 +25,7 @@ import org.apache.samza.system.SystemStreamPartition; /** - * Example input message w/ avro message body and string as the key. + * Example input message w/ Json message body and string as the key. */ public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> {
