http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java new file mode 100644 index 0000000..489e5b8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.storage.kv.Entry; + +import java.util.function.BiFunction; + + +/** + * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used + * by the internal representation and implementation classes in operators. + * + * @param <M> type of input stream {@link Message} for window + * @param <WK> type of window key in the output {@link Message} + * @param <WS> type of {@link WindowState} variable in the state store + * @param <WM> type of the message in the output stream + */ +public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> { + + /** + * get the transformation function of the {@link WindowFn} + * + * @return the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput} + */ + BiFunction<M, Entry<WK, WS>, WM> getTransformFunc(); + + /** + * get the state store functions for this {@link WindowFn} + * + * @return the collection of state store methods + */ + Operators.StoreFunctions<M, WK, WS> getStoreFuncs(); + + /** + * get the trigger conditions for this {@link WindowFn} + * + * @return the trigger condition for the {@link WindowFn} function + */ + Trigger<M, WS> getTrigger(); + +}
http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java new file mode 100644 index 0000000..643b703 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.data.Message; + + +/** + * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function + * + * @param <K> the type of key in the output window result + * @param <M> the type of value in the output window result + */ +public final class WindowOutput<K, M> implements Message<K, M> { + private final K key; + private final M value; + + WindowOutput(K key, M aggregated) { + this.key = key; + this.value = aggregated; + } + + @Override public M getMessage() { + return this.value; + } + + @Override public K getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return 0; + } + + static public <K, M> WindowOutput<K, M> of(K key, M result) { + return new WindowOutput<>(key, result); + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java new file mode 100644 index 0000000..42c8f74 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.task; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; + +import java.util.Collection; + +/** + * This interface defines the methods that user needs to implement via the operator programming APIs. + */ [email protected] +public interface StreamOperatorTask { + + /** + * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s. + * Users have to implement this function to define their transformation logic on each of the incoming + * {@link SystemMessageStream}. + * + * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition} + * + * @param sources the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.data.IncomingSystemMessage} + * from a {@link org.apache.samza.system.SystemStreamPartition} + */ + void initOperators(Collection<SystemMessageStream> sources); + + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java index 5d066c5..7d9d56c 100644 --- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java +++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java @@ -19,12 +19,13 @@ package org.apache.samza.config; -import static org.junit.Assert.*; +import org.junit.Test; -import java.util.Map; import java.util.HashMap; +import java.util.Map; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class TestConfig { /** http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java new file mode 100644 index 0000000..8c56287 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.Message; + + +public class TestMessage implements Message<String, String> { + + private final String key; + private final String value; + private final long timestamp; + + TestMessage(String key, String value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public String getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java new file mode 100644 index 0000000..4dbe233 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.internal.Operators.*; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestMessageStream { + + @Test public void testMap() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2); + MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestOutputMessage> mapOp = subs.iterator().next(); + assertTrue(mapOp instanceof StreamOperator); + assertEquals(mapOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + TestMessage xTestMsg = mock(TestMessage.class); + when(xTestMsg.getKey()).thenReturn("test-msg-key"); + when(xTestMsg.getMessage()).thenReturn("123456789"); + when(xTestMsg.getTimestamp()).thenReturn(12345L); + Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg); + assertEquals(cOutputMsg.size(), 1); + TestOutputMessage outputMessage = cOutputMsg.iterator().next(); + assertEquals(outputMessage.getKey(), xTestMsg.getKey()); + assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1)); + assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2); + } + + @Test public void testFlatMap() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { { + this.add(mock(TestOutputMessage.class)); + this.add(mock(TestOutputMessage.class)); + this.add(mock(TestOutputMessage.class)); + } }; + Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts; + MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestOutputMessage> flatMapOp = subs.iterator().next(); + assertTrue(flatMapOp instanceof StreamOperator); + assertEquals(flatMapOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap); + } + + @Test public void testFilter() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L; + MessageStream<TestMessage> outputStream = inputStream.filter(xFilter); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> filterOp = subs.iterator().next(); + assertTrue(filterOp instanceof StreamOperator); + assertEquals(filterOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction(); + TestMessage mockMsg = mock(TestMessage.class); + when(mockMsg.getTimestamp()).thenReturn(11111L); + Collection<TestMessage> output = txfmFn.apply(mockMsg); + assertTrue(output.isEmpty()); + when(mockMsg.getTimestamp()).thenReturn(999999L); + output = txfmFn.apply(mockMsg); + assertEquals(output.size(), 1); + assertEquals(output.iterator().next(), mockMsg); + } + + @Test public void testSink() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> { + mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); + tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + }; + inputStream.sink(xSink); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> sinkOp = subs.iterator().next(); + assertTrue(sinkOp instanceof SinkOperator); + assertEquals(((SinkOperator) sinkOp).getFunction(), xSink); + assertNull(((SinkOperator) sinkOp).getOutputStream()); + } + + @Test public void testWindow() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class); + MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> wndOp = subs.iterator().next(); + assertTrue(wndOp instanceof WindowOperator); + assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream); + } + + @Test public void testJoin() { + MessageStream<TestMessage> source1 = new MessageStream<>(); + MessageStream<TestMessage> source2 = new MessageStream<>(); + BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp()); + MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner); + Collection<Operator> subs = source1.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> joinOp1 = subs.iterator().next(); + assertTrue(joinOp1 instanceof PartialJoinOperator); + assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput); + subs = source2.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> joinOp2 = subs.iterator().next(); + assertTrue(joinOp2 instanceof PartialJoinOperator); + assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput); + TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L); + TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L); + TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + assertEquals(xOut.getTimestamp(), 11111L); + xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + assertEquals(xOut.getTimestamp(), 11111L); + } + + @Test public void testMerge() { + MessageStream<TestMessage> merge1 = new MessageStream<>(); + Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>() { { + this.add(new MessageStream<>()); + this.add(new MessageStream<>()); + } }; + MessageStream<TestMessage> mergeOutput = merge1.merge(others); + validateMergeOperator(merge1, mergeOutput); + + others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); + } + + private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) { + Collection<Operator> subs = mergeSource.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> mergeOp = subs.iterator().next(); + assertTrue(mergeOp instanceof StreamOperator); + assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput); + TestMessage mockMsg = mock(TestMessage.class); + Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg); + assertEquals(outputs.size(), 1); + assertEquals(outputs.iterator().next(), mockMsg); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java new file mode 100644 index 0000000..c5fcceb --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class TestMessageStreams { + + @Test public void testInput() { + SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0)); + MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp); + assertEquals(mSysStream.getSystemStreamPartition(), ssp); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java new file mode 100644 index 0000000..14e6562 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.Message; + + +public class TestOutputMessage implements Message<String, Integer> { + private final String key; + private final Integer value; + private final long timestamp; + + public TestOutputMessage(String key, Integer value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public Integer getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java new file mode 100644 index 0000000..927b14b --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestTriggerBuilder { + private Field earlyTriggerField; + private Field lateTriggerField; + private Field timerTriggerField; + private Field earlyTriggerUpdater; + private Field lateTriggerUpdater; + + @Before + public void testPrep() throws Exception { + this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); + this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); + this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); + this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); + this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); + + this.earlyTriggerField.setAccessible(true); + this.lateTriggerField.setAccessible(true); + this.timerTriggerField.setAccessible(true); + this.earlyTriggerUpdater.setAccessible(true); + this.lateTriggerUpdater.setAccessible(true); + } + + @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + when(mockState.getNumberMessages()).thenReturn(2000L); + assertTrue(triggerField.apply(null, mockState)); + + Function<TestMessage, Boolean> tokenFunc = m -> true; + builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + TestMessage m = mock(TestMessage.class); + assertTrue(triggerField.apply(m, mockState)); + + builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); + when(m.getTimestamp()).thenReturn(19999000000L); + assertFalse(triggerField.apply(m, mockState)); + when(m.getTimestamp()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + when(m.getTimestamp()).thenReturn(1001000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class); + builder = TriggerBuilder.earlyTrigger(mockFunc); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + assertEquals(triggerField, mockFunc); + + builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); + Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + builder = TriggerBuilder.timeoutSinceLastMessage(10000L); + timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test public void testAddTimerTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addTimeoutSinceFirstMessage(10000L); + // exam that both earlyTrigger and timer triggers are set up + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + // check the timer trigger + Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + // exam that both early trigger and timer triggers are set up + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + builder.addTimeoutSinceLastMessage(20000L); + // check the timer trigger + timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test public void testAddLateTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTriggerOnSizeLimit(10000L); + // exam that both earlyTrigger and lateTriggers are set up + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // check the late trigger + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + // set the number of messages to 10001 to trigger the late trigger + when(mockState.getNumberMessages()).thenReturn(10001L); + assertTrue(lateTrigger.apply(null, mockState)); + + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); + // exam that both earlyTrigger and lateTriggers are set up + earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // exam the lateTrigger + when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); + lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + List<TestMessage> mockList = mock(ArrayList.class); + when(mockList.size()).thenReturn(200); + when(mockState.getOutputValue()).thenReturn(mockList); + assertTrue(lateTrigger.apply(null, mockState)); + } + + @Test public void testAddTriggerUpdater() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.onEarlyTrigger(c -> { + c.clear(); + return c; + }); + List<TestMessage> collection = new ArrayList<TestMessage>() { { + for (int i = 0; i < 10; i++) { + this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime())); + } + } }; + // exam that earlyTriggerUpdater is set up + Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater = + (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getOutputValue()).thenReturn(collection); + earlyTriggerUpdater.apply(mockState); + assertTrue(collection.isEmpty()); + + collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime())); + collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime())); + builder.onLateTrigger(c -> { + c.removeIf(t -> t.getKey().equals("key-to-remove")); + return c; + }); + // check the late trigger updater + Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater = + (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder); + when(mockState.getOutputValue()).thenReturn(collection); + lateTriggerUpdater.apply(mockState); + assertTrue(collection.size() == 1); + assertFalse(collection.get(0).isDelete()); + assertEquals(collection.get(0).getKey(), "key-to-stay"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java new file mode 100644 index 0000000..8a25a96 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.Windows.Window; +import org.apache.samza.operators.internal.Trigger; +import org.apache.samza.operators.internal.WindowOutput; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestWindows { + + @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { + // test constructing the default session window + Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions( + TestMessage::getKey); + assertTrue(testWnd instanceof Windows.SessionWindow); + Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction"); + Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator"); + wndKeyFuncField.setAccessible(true); + aggregatorField.setAccessible(true); + Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd); + assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key"); + BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc = + (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd); + TestMessage mockMsg = mock(TestMessage.class); + Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains(mockMsg)); + + // test constructing the session window w/ customized session info + Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( + m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0)); + assertTrue(testWnd2 instanceof Windows.SessionWindow); + wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2); + aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2); + assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0"); + when(mockMsg.getMessage()).thenReturn("x-001"); + collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains('x')); + + // test constructing session window w/ a default counter + Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getTimestamp())); + assertTrue(testCounter instanceof Windows.SessionWindow); + wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter); + BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter); + when(mockMsg.getTimestamp()).thenReturn(12345L); + assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); + assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); + } + + @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { + Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getTimestamp())); + // test session window w/ a trigger + TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); + testCounter.setTriggers(triggerBuilder); + Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); + Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); + // examine all trigger fields are expected + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdater.setAccessible(true); + lateTriggerUpdater.setAccessible(true); + assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); + assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); + assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); + assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); + assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java new file mode 100644 index 0000000..b734e87 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestIncomingSystemMessage { + + @Test public void testConstructor() { + IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class); + IncomingSystemMessage ism = new IncomingSystemMessage(ime); + + Object mockKey = mock(Object.class); + Object mockValue = mock(Object.class); + LongOffset testOffset = new LongOffset("12345"); + SystemStreamPartition mockSsp = mock(SystemStreamPartition.class); + + when(ime.getKey()).thenReturn(mockKey); + when(ime.getMessage()).thenReturn(mockValue); + when(ime.getSystemStreamPartition()).thenReturn(mockSsp); + when(ime.getOffset()).thenReturn("12345"); + + assertEquals(ism.getKey(), mockKey); + assertEquals(ism.getMessage(), mockValue); + assertEquals(ism.getSystemStreamPartition(), mockSsp); + assertEquals(ism.getOffset(), testOffset); + assertFalse(ism.isDelete()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java new file mode 100644 index 0000000..943c47f --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.junit.Test; + +import java.lang.reflect.Field; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + + +public class TestLongOffset { + + @Test public void testConstructor() throws Exception { + LongOffset o1 = new LongOffset("12345"); + Field offsetField = LongOffset.class.getDeclaredField("offset"); + offsetField.setAccessible(true); + Long x = (Long) offsetField.get(o1); + assertEquals(x.longValue(), 12345L); + + o1 = new LongOffset("012345"); + x = (Long) offsetField.get(o1); + assertEquals(x.longValue(), 12345L); + + try { + o1 = new LongOffset("xyz"); + fail("Constructor of LongOffset should have failed w/ mal-formatted numbers"); + } catch (NumberFormatException nfe) { + // expected + } + } + + @Test public void testComparator() { + LongOffset o1 = new LongOffset("11111"); + Offset other = mock(Offset.class); + try { + o1.compareTo(other); + fail("compareTo() should have have failed when comparing to an object of a different class"); + } catch (IllegalArgumentException iae) { + // expected + } + + LongOffset o2 = new LongOffset("-10000"); + assertEquals(o1.compareTo(o2), 1); + LongOffset o3 = new LongOffset("22222"); + assertEquals(o1.compareTo(o3), -1); + LongOffset o4 = new LongOffset("11111"); + assertEquals(o1.compareTo(o4), 0); + } + + @Test public void testEquals() { + LongOffset o1 = new LongOffset("12345"); + Offset other = mock(Offset.class); + assertFalse(o1.equals(other)); + + LongOffset o2 = new LongOffset("0012345"); + assertTrue(o1.equals(o2)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java new file mode 100644 index 0000000..d994486 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestOperators { + + private class TestMessage implements Message<String, Object> { + private final long timestamp; + private final String key; + private final Object msg; + + + TestMessage(String key, Object msg, long timestamp) { + this.timestamp = timestamp; + this.key = key; + this.msg = msg; + } + + @Override public Object getMessage() { + return this.msg; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } + } + + @Test public void testGetStreamOperator() { + Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { { + this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L)); + } }; + Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn); + assertEquals(strmOp.getFunction(), transformFn); + assertTrue(strmOp.getOutputStream() instanceof MessageStream); + } + + @Test public void testGetSinkOperator() { + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { }; + Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn); + assertEquals(sinkOp.getFunction(), sinkFn); + assertTrue(sinkOp.getOutputStream() == null); + } + + @Test public void testGetWindowOperator() { + WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null; + Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class); + Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class); + MessageStream<TestMessage> mockInput = mock(MessageStream.class); + when(windowFn.getTransformFunc()).thenReturn(xFunction); + when(windowFn.getStoreFuncs()).thenReturn(storeFns); + when(windowFn.getTrigger()).thenReturn(trigger); + when(mockInput.toString()).thenReturn("mockStream1"); + + Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn); + assertEquals(windowOp.getFunction(), xFunction); + assertEquals(windowOp.getStoreFunctions(), storeFns); + assertEquals(windowOp.getTrigger(), trigger); + assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString())); + } + + @Test public void testGetPartialJoinOperator() { + BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger = + (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(), + Math.max(m1.getTimestamp(), m2.getTimestamp())); + MessageStream<TestMessage> joinOutput = new MessageStream<>(); + Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin = + Operators.getPartialJoinOperator(merger, joinOutput); + + assertEquals(partialJoin.getOutputStream(), joinOutput); + Message<Object, Object> m = mock(Message.class); + Message<Object, Object> s = mock(Message.class); + assertEquals(partialJoin.getFunction(), merger); + assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); + assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m); + assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); + assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater()); + } + + @Test public void testGetMergeOperator() { + MessageStream<TestMessage> output = new MessageStream<>(); + Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output); + Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { { + this.add(t); + } }; + TestMessage t = mock(TestMessage.class); + assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t)); + assertEquals(mergeOp.getOutputStream(), output); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java new file mode 100644 index 0000000..0f35a7c --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + + +public class TestTrigger { + + @Test public void testConstructor() throws Exception { + BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; + BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; + Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); + Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { + s.setOutputValue(0); + return s; + }; + Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { + s.setOutputValue(1); + return s; + }; + + Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, + earlyTriggerUpdater, lateTriggerUpdater); + + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdaterField.setAccessible(true); + lateTriggerUpdaterField.setAccessible(true); + + assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); + assertEquals(timerTrigger, timerTriggerField.get(trigger)); + assertEquals(lateTrigger, lateTriggerField.get(trigger)); + assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); + assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java new file mode 100644 index 0000000..268c9fc --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class TestWindowOutput { + + @Test public void testConstructor() { + WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); + assertEquals(wndOutput.getKey(), "testMsg"); + assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); + assertFalse(wndOutput.isDelete()); + assertEquals(wndOutput.getTimestamp(), 0); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/README.md ---------------------------------------------------------------------- diff --git a/samza-operator/README.md b/samza-operator/README.md new file mode 100644 index 0000000..15d2092 --- /dev/null +++ b/samza-operator/README.md @@ -0,0 +1,17 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +samza-operator is an experimental module that is under development (SAMZA-552). http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java new file mode 100644 index 0000000..1eee2dc --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.Operator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + + +/** + * Implementation class for a chain of operators from the single input {@code source} + * + * @param <M> type of message in the input stream {@code source} + */ +public class ChainedOperators<M extends Message> { + + private final Set<OperatorImpl> subscribers = new HashSet<>(); + + /** + * Private constructor + * + * @param source the input source {@link MessageStream} + * @param context the {@link TaskContext} object that we need to instantiate the state stores + */ + private ChainedOperators(MessageStream<M> source, TaskContext context) { + // create the pipeline/topology starting from source + source.getSubscribers().forEach(sub -> { + // pass in the context s.t. stateful stream operators can initialize their stores + OperatorImpl subImpl = this.createAndSubscribe(sub, source, context); + this.subscribers.add(subImpl); + }); + } + + /** + * Private function to recursively instantiate the implementation of operators and the chains + * + * @param operator the operator that subscribe to {@code source} + * @param source the source {@link MessageStream} + * @param context the context of the task + * @return the implementation object of the corresponding {@code operator} + */ + private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source, + TaskContext context) { + Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator); + if (factoryEntry.getValue()) { + // The operator has already been instantiated and we do not need to traverse and create the subscribers any more. + return factoryEntry.getKey(); + } + OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey(); + MessageStream outStream = operator.getOutputStream(); + Collection<Operator> subs = outStream.getSubscribers(); + subs.forEach(sub -> { + OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context); + opImpl.subscribe(subImpl); + }); + // initialize the operator's state store + opImpl.init(source, context); + return opImpl; + } + + /** + * Static method to create a {@link ChainedOperators} from the {@code source} stream + * + * @param source the input source {@link MessageStream} + * @param context the {@link TaskContext} object used to initialize the {@link StateStoreImpl} + * @param <M> the type of input {@link Message} + * @return a {@link ChainedOperators} object takes the {@code source} as input + */ + public static <M extends Message> ChainedOperators create(MessageStream<M> source, TaskContext context) { + return new ChainedOperators<>(source, context); + } + + /** + * Method to navigate the incoming {@code message} through the processing chains + * + * @param message the incoming message to this {@link ChainedOperators} + * @param collector the {@link MessageCollector} object within the process context + * @param coordinator the {@link TaskCoordinator} object within the process context + */ + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator)); + } + + /** + * Method to handle timer events + * + * @param collector the {@link MessageCollector} object within the process context + * @param coordinator the {@link TaskCoordinator} object within the process context + */ + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + long nanoTime = System.nanoTime(); + this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java new file mode 100644 index 0000000..ea90878 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.*; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.operators.impl.join.PartialJoinOpImpl; +import org.apache.samza.operators.impl.window.SessionWindowImpl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * The factory class that instantiates all implementation of {@link OperatorImpl} classes. + */ +public class OperatorFactory { + + /** + * the static operatorMap that includes all operator implementation instances + */ + private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> OPERATOR_MAP = new ConcurrentHashMap<>(); + + /** + * The method to actually create the implementation instances of operators + * + * @param operator the immutable definition of {@link Operator} + * @param <M> type of input {@link Message} + * @param <RM> type of output {@link Message} + * @return the implementation object of {@link OperatorImpl} + */ + private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) { + if (operator instanceof StreamOperator) { + return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator); + } else if (operator instanceof SinkOperator) { + return new SinkOperatorImpl<>((SinkOperator<M>) operator); + } else if (operator instanceof WindowOperator) { + return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator); + } else if (operator instanceof PartialJoinOperator) { + return new PartialJoinOpImpl<>((PartialJoinOperator) operator); + } + throw new IllegalArgumentException( + String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName())); + } + + /** + * The method to get the unique implementation instance of {@link Operator} + * + * @param operator the {@link Operator} to instantiate + * @param <M> type of input {@link Message} + * @param <RM> type of output {@link Message} + * @return A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether + * the operator instance has already been created or not. True means the operator instance has already created, false means the operator + * was not created. + */ + public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) { + if (!OPERATOR_MAP.containsKey(operator)) { + OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator); + if (OPERATOR_MAP.putIfAbsent(operator, operatorImpl) == null) { + return new Entry<OperatorImpl<M, ? extends Message>, Boolean>(operatorImpl, false) { }; + } + } + return new Entry<OperatorImpl<M, ? extends Message>, Boolean>((OperatorImpl<M, ? extends Message>) OPERATOR_MAP.get(operator), true) { }; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java new file mode 100644 index 0000000..efa6a96 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.reactivestreams.Processor; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.HashSet; +import java.util.Set; + + +/** + * Abstract base class for all stream operator implementation classes. + */ +public abstract class OperatorImpl<M extends Message, RM extends Message> + implements Processor<ProcessorContext<M>, ProcessorContext<RM>> { + + private final Set<Subscriber<? super ProcessorContext<RM>>> subscribers = new HashSet<>(); + + @Override public void subscribe(Subscriber<? super ProcessorContext<RM>> s) { + // Only add once + subscribers.add(s); + } + + @Override public void onSubscribe(Subscription s) { + + } + + @Override public void onNext(ProcessorContext<M> o) { + + onNext(o.getMessage(), o.getCollector(), o.getCoordinator()); + } + + @Override public void onError(Throwable t) { + + } + + @Override public void onComplete() { + + } + + /** + * Default method for timer event + * + * @param nanoTime the system nano-second when the timer event is triggered + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + public void onTimer(long nanoTime, MessageCollector collector, TaskCoordinator coordinator) { + this.subscribers.forEach(sub -> ((OperatorImpl) sub).onTimer(nanoTime, collector, coordinator)); + } + + /** + * Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers. + * + * @param message the input {@link Message} + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + protected abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); + + /** + * Stateful operators will need to override this method to initialize the operators + * + * @param source the source that this {@link OperatorImpl} object subscribe to + * @param context the task context to initialize the operators within + */ + protected void init(MessageStream<M> source, TaskContext context) {}; + + /** + * Method to trigger all downstream operators that consumes the output {@link MessageStream} + * from this operator + * + * @param omsg output {@link Message} + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + protected void nextProcessors(RM omsg, MessageCollector collector, TaskCoordinator coordinator) { + subscribers.forEach(sub -> + sub.onNext(new ProcessorContext<>(omsg, collector, coordinator)) + ); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java new file mode 100644 index 0000000..cc7ef2b --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Wrapper class to be used by {@link OperatorImpl} + * + * @param <M> Type of input stream {@link Message} + */ +public class ProcessorContext<M extends Message> { + private final M message; + private final MessageCollector collector; + private final TaskCoordinator coordinator; + + ProcessorContext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.message = message; + this.collector = collector; + this.coordinator = coordinator; + } + + M getMessage() { + return this.message; + } + + MessageCollector getCollector() { + return this.collector; + } + + TaskCoordinator getCoordinator() { + return this.coordinator; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java new file mode 100644 index 0000000..b0f4f27 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.StreamOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.Collection; +import java.util.function.Function; + + +/** + * Base class for all implementation of operators + * + * @param <M> type of message in the input stream + * @param <RM> type of message in the output stream + */ +public class SimpleOperatorImpl<M extends Message, RM extends Message> extends OperatorImpl<M, RM> { + + private final Function<M, Collection<RM>> transformFn; + + SimpleOperatorImpl(StreamOperator<M, RM> op) { + super(); + this.transformFn = op.getFunction(); + } + + @Override protected void onNext(M imsg, MessageCollector collector, TaskCoordinator coordinator) { + // actually calling the transform function and then for each output, call nextProcessors() + this.transformFn.apply(imsg).forEach(r -> this.nextProcessors(r, collector, coordinator)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java new file mode 100644 index 0000000..a8a639e --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.internal.Operators.SinkOperator; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.data.Message; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation for {@link SinkOperator} + */ +public class SinkOperatorImpl<M extends Message> extends OperatorImpl<M, Message> { + private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFunc; + + SinkOperatorImpl(SinkOperator<M> sinkOp) { + this.sinkFunc = sinkOp.getFunction(); + } + + @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.sinkFunc.apply(message, collector, coordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java new file mode 100644 index 0000000..7840b5b --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.StoreFunctions; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; + + +/** + * The base class for all state stores + */ +public class StateStoreImpl<M extends Message, SK, SS> { + private final String storeName; + private final StoreFunctions<M, SK, SS> storeFunctions; + private KeyValueStore<SK, SS> kvStore = null; + + public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) { + this.storeFunctions = store; + this.storeName = storeName; + } + + public void init(TaskContext context) { + this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName); + } + + public Entry<SK, SS> getState(M m) { + SK key = this.storeFunctions.getStoreKeyFinder().apply(m); + SS state = this.kvStore.get(key); + return new Entry<>(key, state); + } + + public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) { + SS newValue = this.storeFunctions.getStateUpdater().apply(m, oldEntry.getValue()); + this.kvStore.put(oldEntry.getKey(), newValue); + return new Entry<>(oldEntry.getKey(), newValue); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java new file mode 100644 index 0000000..4238d45 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.join; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.PartialJoinOperator; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation of a {@link PartialJoinOperator}. This class implements function + * that only takes in one input stream among all inputs to the join and generate the join output. + * + * @param <M> Type of input stream {@link Message} + * @param <RM> Type of join output stream {@link Message} + */ +public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> { + + public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) { + // TODO: implement PartialJoinOpImpl constructor + } + + @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // TODO: implement PartialJoinOpImpl processing logic + } +}
