http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java new file mode 100644 index 0000000..da813b1 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.task; + +import org.apache.samza.operators.MessageStreams.SystemMessageStream; + +import java.util.Collection; + +/** + * This interface defines the methods that user needs to implement via the operator programming APIs. + */ +public interface StreamOperatorTask { + + /** + * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s. + * Users have to implement this function to define their transformation logic on each of the incoming + * {@link SystemMessageStream}. + * + * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition} + * + * @param sources the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.data.IncomingSystemMessage} + * from a {@link org.apache.samza.system.SystemStreamPartition} + */ + void initOperators(Collection<SystemMessageStream> sources); + + +}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java index 963ccf2..adb6264 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java @@ -47,12 +47,12 @@ public interface StorageEngineFactory<K, V> { * @return The storage engine instance. */ public StorageEngine getStorageEngine( - String storeName, - File storeDir, - Serde<K> keySerde, - Serde<V> msgSerde, - MessageCollector collector, - MetricsRegistry registry, - SystemStreamPartition changeLogSystemStreamPartition, - SamzaContainerContext containerContext); + String storeName, + File storeDir, + Serde<K> keySerde, + Serde<V> msgSerde, + MessageCollector collector, + MetricsRegistry registry, + SystemStreamPartition changeLogSystemStreamPartition, + SamzaContainerContext containerContext); } http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java new file mode 100644 index 0000000..8c56287 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.Message; + + +public class TestMessage implements Message<String, String> { + + private final String key; + private final String value; + private final long timestamp; + + TestMessage(String key, String value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public String getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java new file mode 100644 index 0000000..4dbe233 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.internal.Operators.*; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestMessageStream { + + @Test public void testMap() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2); + MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestOutputMessage> mapOp = subs.iterator().next(); + assertTrue(mapOp instanceof StreamOperator); + assertEquals(mapOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + TestMessage xTestMsg = mock(TestMessage.class); + when(xTestMsg.getKey()).thenReturn("test-msg-key"); + when(xTestMsg.getMessage()).thenReturn("123456789"); + when(xTestMsg.getTimestamp()).thenReturn(12345L); + Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg); + assertEquals(cOutputMsg.size(), 1); + TestOutputMessage outputMessage = cOutputMsg.iterator().next(); + assertEquals(outputMessage.getKey(), xTestMsg.getKey()); + assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1)); + assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2); + } + + @Test public void testFlatMap() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { { + this.add(mock(TestOutputMessage.class)); + this.add(mock(TestOutputMessage.class)); + this.add(mock(TestOutputMessage.class)); + } }; + Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts; + MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestOutputMessage> flatMapOp = subs.iterator().next(); + assertTrue(flatMapOp instanceof StreamOperator); + assertEquals(flatMapOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap); + } + + @Test public void testFilter() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L; + MessageStream<TestMessage> outputStream = inputStream.filter(xFilter); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> filterOp = subs.iterator().next(); + assertTrue(filterOp instanceof StreamOperator); + assertEquals(filterOp.getOutputStream(), outputStream); + // assert that the transformation function is what we defined above + Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction(); + TestMessage mockMsg = mock(TestMessage.class); + when(mockMsg.getTimestamp()).thenReturn(11111L); + Collection<TestMessage> output = txfmFn.apply(mockMsg); + assertTrue(output.isEmpty()); + when(mockMsg.getTimestamp()).thenReturn(999999L); + output = txfmFn.apply(mockMsg); + assertEquals(output.size(), 1); + assertEquals(output.iterator().next(), mockMsg); + } + + @Test public void testSink() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> { + mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); + tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + }; + inputStream.sink(xSink); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> sinkOp = subs.iterator().next(); + assertTrue(sinkOp instanceof SinkOperator); + assertEquals(((SinkOperator) sinkOp).getFunction(), xSink); + assertNull(((SinkOperator) sinkOp).getOutputStream()); + } + + @Test public void testWindow() { + MessageStream<TestMessage> inputStream = new MessageStream<>(); + Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class); + MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window); + Collection<Operator> subs = inputStream.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> wndOp = subs.iterator().next(); + assertTrue(wndOp instanceof WindowOperator); + assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream); + } + + @Test public void testJoin() { + MessageStream<TestMessage> source1 = new MessageStream<>(); + MessageStream<TestMessage> source2 = new MessageStream<>(); + BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp()); + MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner); + Collection<Operator> subs = source1.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> joinOp1 = subs.iterator().next(); + assertTrue(joinOp1 instanceof PartialJoinOperator); + assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput); + subs = source2.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> joinOp2 = subs.iterator().next(); + assertTrue(joinOp2 instanceof PartialJoinOperator); + assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput); + TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L); + TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L); + TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + assertEquals(xOut.getTimestamp(), 11111L); + xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + assertEquals(xOut.getTimestamp(), 11111L); + } + + @Test public void testMerge() { + MessageStream<TestMessage> merge1 = new MessageStream<>(); + Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>() { { + this.add(new MessageStream<>()); + this.add(new MessageStream<>()); + } }; + MessageStream<TestMessage> mergeOutput = merge1.merge(others); + validateMergeOperator(merge1, mergeOutput); + + others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); + } + + private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) { + Collection<Operator> subs = mergeSource.getSubscribers(); + assertEquals(subs.size(), 1); + Operator<TestMessage> mergeOp = subs.iterator().next(); + assertTrue(mergeOp instanceof StreamOperator); + assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput); + TestMessage mockMsg = mock(TestMessage.class); + Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg); + assertEquals(outputs.size(), 1); + assertEquals(outputs.iterator().next(), mockMsg); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java new file mode 100644 index 0000000..c5fcceb --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class TestMessageStreams { + + @Test public void testInput() { + SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0)); + MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp); + assertEquals(mSysStream.getSystemStreamPartition(), ssp); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java new file mode 100644 index 0000000..14e6562 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.Message; + + +public class TestOutputMessage implements Message<String, Integer> { + private final String key; + private final Integer value; + private final long timestamp; + + public TestOutputMessage(String key, Integer value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public Integer getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java new file mode 100644 index 0000000..e6d9e4a --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestTriggerBuilder { + private Field earlyTriggerField; + private Field lateTriggerField; + private Field timerTriggerField; + private Field earlyTriggerUpdater; + private Field lateTriggerUpdater; + + @Before + public void testPrep() throws Exception { + this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); + this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); + this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); + this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); + this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); + + this.earlyTriggerField.setAccessible(true); + this.lateTriggerField.setAccessible(true); + this.timerTriggerField.setAccessible(true); + this.earlyTriggerUpdater.setAccessible(true); + this.lateTriggerUpdater.setAccessible(true); + } + + @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + when(mockState.getNumberMessages()).thenReturn(2000L); + assertTrue(triggerField.apply(null, mockState)); + + Function<TestMessage, Boolean> tokenFunc = m -> true; + builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + TestMessage m = mock(TestMessage.class); + assertTrue(triggerField.apply(m, mockState)); + + builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); + when(m.getTimestamp()).thenReturn(19999000000L); + assertFalse(triggerField.apply(m, mockState)); + when(m.getTimestamp()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + when(m.getTimestamp()).thenReturn(1001000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class); + builder = TriggerBuilder.earlyTrigger(mockFunc); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + assertEquals(triggerField, mockFunc); + + builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); + Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + builder = TriggerBuilder.timeoutSinceLastMessage(10000L); + timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test public void testAddTimerTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addTimeoutSinceFirstMessage(10000L); + // exam that both earlyTrigger and timer triggers are set up + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + // check the timer trigger + Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + // exam that both early trigger and timer triggers are set up + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + builder.addTimeoutSinceLastMessage(20000L); + // check the timer trigger + timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test public void testAddLateTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTriggerOnSizeLimit(10000L); + // exam that both earlyTrigger and lateTriggers are set up + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // check the late trigger + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + // set the number of messages to 10001 to trigger the late trigger + when(mockState.getNumberMessages()).thenReturn(10001L); + assertTrue(lateTrigger.apply(null, mockState)); + + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); + // exam that both earlyTrigger and lateTriggers are set up + earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // exam the lateTrigger + when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); + lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + List<TestMessage> mockList = mock(ArrayList.class); + when(mockList.size()).thenReturn(200); + when(mockState.getOutputValue()).thenReturn(mockList); + assertTrue(lateTrigger.apply(null, mockState)); + } + + @Test public void testAddTriggerUpdater() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.onEarlyTrigger(c -> { + c.clear(); + return c; + }); + List<TestMessage> collection = new ArrayList<TestMessage>() { { + for (int i = 0; i < 10; i++) { + this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime())); + } + } }; + // exam that earlyTriggerUpdater is set up + Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater = + (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getOutputValue()).thenReturn(collection); + earlyTriggerUpdater.apply(mockState); + assertTrue(collection.isEmpty()); + + collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime())); + collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime())); + builder.onLateTrigger(c -> { + c.removeIf(t -> t.getKey().equals("key-to-remove")); + return c; + }); + // check the late trigger updater + Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater = + (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder); + when(mockState.getOutputValue()).thenReturn(collection); + lateTriggerUpdater.apply(mockState); + assertTrue(collection.size() == 1); + assertFalse(collection.get(0).isDelete()); + assertEquals(collection.get(0).getKey(), "key-to-stay"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java new file mode 100644 index 0000000..8a25a96 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.Windows.Window; +import org.apache.samza.operators.internal.Trigger; +import org.apache.samza.operators.internal.WindowOutput; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestWindows { + + @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { + // test constructing the default session window + Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions( + TestMessage::getKey); + assertTrue(testWnd instanceof Windows.SessionWindow); + Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction"); + Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator"); + wndKeyFuncField.setAccessible(true); + aggregatorField.setAccessible(true); + Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd); + assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key"); + BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc = + (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd); + TestMessage mockMsg = mock(TestMessage.class); + Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains(mockMsg)); + + // test constructing the session window w/ customized session info + Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( + m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0)); + assertTrue(testWnd2 instanceof Windows.SessionWindow); + wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2); + aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2); + assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0"); + when(mockMsg.getMessage()).thenReturn("x-001"); + collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains('x')); + + // test constructing session window w/ a default counter + Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getTimestamp())); + assertTrue(testCounter instanceof Windows.SessionWindow); + wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter); + BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter); + when(mockMsg.getTimestamp()).thenReturn(12345L); + assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); + assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); + } + + @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { + Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getTimestamp())); + // test session window w/ a trigger + TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); + testCounter.setTriggers(triggerBuilder); + Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); + Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); + // examine all trigger fields are expected + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdater.setAccessible(true); + lateTriggerUpdater.setAccessible(true); + assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); + assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); + assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); + assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); + assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java new file mode 100644 index 0000000..b734e87 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestIncomingSystemMessage { + + @Test public void testConstructor() { + IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class); + IncomingSystemMessage ism = new IncomingSystemMessage(ime); + + Object mockKey = mock(Object.class); + Object mockValue = mock(Object.class); + LongOffset testOffset = new LongOffset("12345"); + SystemStreamPartition mockSsp = mock(SystemStreamPartition.class); + + when(ime.getKey()).thenReturn(mockKey); + when(ime.getMessage()).thenReturn(mockValue); + when(ime.getSystemStreamPartition()).thenReturn(mockSsp); + when(ime.getOffset()).thenReturn("12345"); + + assertEquals(ism.getKey(), mockKey); + assertEquals(ism.getMessage(), mockValue); + assertEquals(ism.getSystemStreamPartition(), mockSsp); + assertEquals(ism.getOffset(), testOffset); + assertFalse(ism.isDelete()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java new file mode 100644 index 0000000..943c47f --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.junit.Test; + +import java.lang.reflect.Field; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + + +public class TestLongOffset { + + @Test public void testConstructor() throws Exception { + LongOffset o1 = new LongOffset("12345"); + Field offsetField = LongOffset.class.getDeclaredField("offset"); + offsetField.setAccessible(true); + Long x = (Long) offsetField.get(o1); + assertEquals(x.longValue(), 12345L); + + o1 = new LongOffset("012345"); + x = (Long) offsetField.get(o1); + assertEquals(x.longValue(), 12345L); + + try { + o1 = new LongOffset("xyz"); + fail("Constructor of LongOffset should have failed w/ mal-formatted numbers"); + } catch (NumberFormatException nfe) { + // expected + } + } + + @Test public void testComparator() { + LongOffset o1 = new LongOffset("11111"); + Offset other = mock(Offset.class); + try { + o1.compareTo(other); + fail("compareTo() should have have failed when comparing to an object of a different class"); + } catch (IllegalArgumentException iae) { + // expected + } + + LongOffset o2 = new LongOffset("-10000"); + assertEquals(o1.compareTo(o2), 1); + LongOffset o3 = new LongOffset("22222"); + assertEquals(o1.compareTo(o3), -1); + LongOffset o4 = new LongOffset("11111"); + assertEquals(o1.compareTo(o4), 0); + } + + @Test public void testEquals() { + LongOffset o1 = new LongOffset("12345"); + Offset other = mock(Offset.class); + assertFalse(o1.equals(other)); + + LongOffset o2 = new LongOffset("0012345"); + assertTrue(o1.equals(o2)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java new file mode 100644 index 0000000..d994486 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestOperators { + + private class TestMessage implements Message<String, Object> { + private final long timestamp; + private final String key; + private final Object msg; + + + TestMessage(String key, Object msg, long timestamp) { + this.timestamp = timestamp; + this.key = key; + this.msg = msg; + } + + @Override public Object getMessage() { + return this.msg; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } + } + + @Test public void testGetStreamOperator() { + Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { { + this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L)); + } }; + Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn); + assertEquals(strmOp.getFunction(), transformFn); + assertTrue(strmOp.getOutputStream() instanceof MessageStream); + } + + @Test public void testGetSinkOperator() { + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { }; + Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn); + assertEquals(sinkOp.getFunction(), sinkFn); + assertTrue(sinkOp.getOutputStream() == null); + } + + @Test public void testGetWindowOperator() { + WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null; + Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class); + Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class); + MessageStream<TestMessage> mockInput = mock(MessageStream.class); + when(windowFn.getTransformFunc()).thenReturn(xFunction); + when(windowFn.getStoreFuncs()).thenReturn(storeFns); + when(windowFn.getTrigger()).thenReturn(trigger); + when(mockInput.toString()).thenReturn("mockStream1"); + + Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn); + assertEquals(windowOp.getFunction(), xFunction); + assertEquals(windowOp.getStoreFunctions(), storeFns); + assertEquals(windowOp.getTrigger(), trigger); + assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString())); + } + + @Test public void testGetPartialJoinOperator() { + BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger = + (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(), + Math.max(m1.getTimestamp(), m2.getTimestamp())); + MessageStream<TestMessage> joinOutput = new MessageStream<>(); + Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin = + Operators.getPartialJoinOperator(merger, joinOutput); + + assertEquals(partialJoin.getOutputStream(), joinOutput); + Message<Object, Object> m = mock(Message.class); + Message<Object, Object> s = mock(Message.class); + assertEquals(partialJoin.getFunction(), merger); + assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); + assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m); + assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); + assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater()); + } + + @Test public void testGetMergeOperator() { + MessageStream<TestMessage> output = new MessageStream<>(); + Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output); + Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { { + this.add(t); + } }; + TestMessage t = mock(TestMessage.class); + assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t)); + assertEquals(mergeOp.getOutputStream(), output); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java new file mode 100644 index 0000000..0f35a7c --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + + +public class TestTrigger { + + @Test public void testConstructor() throws Exception { + BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; + BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; + Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); + Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { + s.setOutputValue(0); + return s; + }; + Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { + s.setOutputValue(1); + return s; + }; + + Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, + earlyTriggerUpdater, lateTriggerUpdater); + + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdaterField.setAccessible(true); + lateTriggerUpdaterField.setAccessible(true); + + assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); + assertEquals(timerTrigger, timerTriggerField.get(trigger)); + assertEquals(lateTrigger, lateTriggerField.get(trigger)); + assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); + assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java new file mode 100644 index 0000000..268c9fc --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class TestWindowOutput { + + @Test public void testConstructor() { + WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); + assertEquals(wndOutput.getKey(), "testMsg"); + assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); + assertFalse(wndOutput.isDelete()); + assertEquals(wndOutput.getTimestamp(), 0); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java index 429573b..75de630 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java @@ -94,8 +94,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap { } @Override - public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( - Set<SystemStreamPartition> systemStreamPartitions, long timeout) + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { if (blockpollFlag) { http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java index 0e73e18..baf146a 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java @@ -134,7 +134,9 @@ public class TestCoordinatorStreamSystemConsumer { assertEquals(expectedSystemStreamPartition, systemStreamPartition); } - public int getRegisterCount() { return registerCount; } + public int getRegisterCount() { + return registerCount; + } public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>(); http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java deleted file mode 100644 index b5e1028..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.Windows.Window; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators; -import org.apache.samza.operators.api.internal.Operators.Operator; -import org.apache.samza.operators.api.internal.WindowOutput; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to - * directly program the stream processing stages that processes a stream and generate another one. - * - * @param <M> Type of message in this stream - */ -public class MessageStream<M extends Message> { - - private final Set<Operator> subscribers = new HashSet<>(); - - /** - * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}. - * - * NOTE: This should only be used by implementation of {@link org.apache.samza.operators.impl.ChainedOperators}, not directly by programmers. - * - * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object - */ - public Collection<Operator> getSubscribers() { - return Collections.unmodifiableSet(this.subscribers); - } - - /** - * Public API methods start here - */ - - /** - * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value - * - * @param <A> the type of input {@code a} - * @param <B> the type of input {@code b} - * @param <C> the type of input {@code c} - */ - @FunctionalInterface - public interface VoidFunction3 <A, B, C> { - public void apply(A a, B b, C c); - } - - /** - * Method to apply a map function (1:1) on a {@link MessageStream} - * - * @param mapper the mapper function to map one input {@link Message} to one output {@link Message} - * @param <OM> the type of the output {@link Message} in the output {@link MessageStream} - * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} - */ - public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) { - Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{ - OM r = mapper.apply(m); - if (r != null) { - this.add(r); - } - }}); - this.subscribers.add(op); - return op.getOutputStream(); - } - - /** - * Method to apply a flatMap function (1:n) on a {@link MessageStream} - * - * @param flatMapper the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s - * @param <OM> the type of the output {@link Message} in the output {@link MessageStream} - * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} - */ - public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) { - Operator<OM> op = Operators.getStreamOperator(flatMapper); - this.subscribers.add(op); - return op.getOutputStream(); - } - - /** - * Method to apply a filter function on a {@link MessageStream} - * - * @param filter the filter function to filter input {@link Message}s from the input {@link MessageStream} - * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream} - */ - public MessageStream<M> filter(Function<M, Boolean> filter) { - Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{ - if (filter.apply(t)) { - this.add(t); - } - }}); - this.subscribers.add(op); - return op.getOutputStream(); - } - - /** - * Method to send an input {@link MessageStream} to an output {@link SystemStream}, and allows the output {@link MessageStream} - * to be consumed by downstream stream operators again. - * - * @param sink the user-defined sink function to send the input {@link Message}s to the external output systems - */ - public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { - this.subscribers.add(Operators.getSinkOperator(sink)); - } - - /** - * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream} - * - * @param window the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream} - * @param <WK> the type of key in the output {@link Message} from the {@link Window} function - * @param <WV> the type of output value from - * @param <WS> the type of window state kept in the {@link Window} function - * @param <WM> the type of {@link WindowOutput} message from the {@link Window} function - * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream} - */ - public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) { - Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window)); - this.subscribers.add(wndOp); - return wndOp.getOutputStream(); - } - - /** - * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins. - * - * @param other the other stream to be joined w/ - * @param merger the common function to merge messages from this {@link MessageStream} and {@code other} - * @param <K> the type of join key - * @param <JM> the type of message in the {@link Message} from the other join stream - * @param <RM> the type of message in the {@link Message} from the join function - * @return the output {@link MessageStream} from the join function {@code joiner} - */ - public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other, - BiFunction<M, JM, RM> merger) { - MessageStream<RM> outputStream = new MessageStream<>(); - - BiFunction<M, JM, RM> parJoin1 = merger::apply; - BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m); - - // TODO: need to add default store functions for the two partial join functions - - other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream)); - this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream)); - return outputStream; - } - - /** - * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M} - * - * @param others other streams to be merged w/ this one - * @return the merged output stream - */ - public MessageStream<M> merge(Collection<MessageStream<M>> others) { - MessageStream<M> outputStream = new MessageStream<>(); - - others.add(this); - others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream))); - return outputStream; - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java deleted file mode 100644 index 59dd91c..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.data.IncomingSystemMessage; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream} - * from a specific input source. - * - */ - -public final class MessageStreams { - - /** - * private constructor to prevent instantiation - */ - private MessageStreams() {} - - /** - * private class for system input/output {@link MessageStream} - */ - public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> { - /** - * The corresponding {@link org.apache.samza.system.SystemStream} - */ - private final SystemStreamPartition ssp; - - /** - * Constructor for input system stream - * - * @param ssp the input {@link SystemStreamPartition} for the input {@link SystemMessageStream} - */ - private SystemMessageStream(SystemStreamPartition ssp) { - this.ssp = ssp; - } - - /** - * Getter for the {@link SystemStreamPartition} of the input - * - * @return the input {@link SystemStreamPartition} - */ - public SystemStreamPartition getSystemStreamPartition() { - return this.ssp; - } - } - - /** - * Public static API methods start here - */ - - /** - * Static API method to create a {@link MessageStream} from a system input stream - * - * @param ssp the input {@link SystemStreamPartition} - * @return the {@link MessageStream} object takes {@code ssp} as the input - */ - public static SystemMessageStream input(SystemStreamPartition ssp) { - return new SystemMessageStream(ssp); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java b/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java deleted file mode 100644 index fc3ea37..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Trigger; - -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines a builder of {@link Trigger} object for a {@link Windows.Window}. The triggers are categorized into - * three types: - * - * <p> - * early trigger: defines the condition when the first output from the window function is sent. - * late trigger: defines the condition when the updated output after the first output is sent. - * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers - * </p> - * - * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR). - * - * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.api.internal} to create triggers - * - * - * @param <M> the type of input {@link Message} to the {@link Windows.Window} - * @param <V> the type of output value from the {@link Windows.Window} - */ -public final class TriggerBuilder<M extends Message, V> { - - /** - * Predicate helper to OR multiple trigger conditions - */ - static class PredicateHelper { - static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) { - return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s); - } - - static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) { - return s -> lhs.apply(s) || rhs.apply(s); - } - } - - /** - * The early trigger condition that determines the first output from the {@link Windows.Window} - */ - private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null; - - /** - * The late trigger condition that determines the late output(s) from the {@link Windows.Window} - */ - private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null; - - /** - * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward - */ - private Function<WindowState<V>, Boolean> timerTrigger = null; - - /** - * The state updater function to be applied after the first output is triggered - */ - private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity(); - - /** - * The state updater function to be applied after the late output is triggered - */ - private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity(); - - /** - * Helper method to add a trigger condition - * - * @param currentTrigger current trigger condition - * @param newTrigger new trigger condition - * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger} - */ - private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger, - BiFunction<M, WindowState<V>, Boolean> newTrigger) { - if (currentTrigger == null) { - return newTrigger; - } - - return PredicateHelper.or(currentTrigger, newTrigger); - } - - /** - * Helper method to add a system timer trigger - * - * @param currentTimer current timer condition - * @param newTimer new timer condition - * @return combined timer condition that is {@code currentTimer} OR {@code newTimer} - */ - private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer, - Function<WindowState<V>, Boolean> newTimer) { - if (currentTimer == null) { - return newTimer; - } - - return PredicateHelper.or(currentTimer, newTimer); - } - - /** - * default constructor to prevent instantiation - */ - private TriggerBuilder() {} - - /** - * Constructor that set the size limit as the early trigger for a window - * - * @param sizeLimit the number of messages in a window that would trigger the first output - */ - private TriggerBuilder(long sizeLimit) { - this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit; - } - - /** - * Constructor that set the event time length as the early trigger - * - * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link Message} - * @param wndLenMs the window length in event time in milli-second - */ - private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) { - this.earlyTrigger = (m, s) -> - TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(), - eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs; - } - - /** - * Constructor that set the special token message as the early trigger - * - * @param tokenFunc the function that checks whether an input {@link Message} is a token message that triggers window output - */ - private TriggerBuilder(Function<M, Boolean> tokenFunc) { - this.earlyTrigger = (m, s) -> tokenFunc.apply(m); - } - - /** - * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder} - * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object - * - * @return the final {@link Trigger} object - */ - Trigger<M, WindowState<V>> build() { - return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater); - } - - /** - * Public API methods start here - */ - - - /** - * API method to allow users to set an update method to update the output value after the first window output is triggered - * by the early trigger condition - * - * @param onTriggerFunc the method to update the output value after the early trigger - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) { - this.earlyTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; }; - return this; - } - - /** - * API method to allow users to set an update method to update the output value after a late window output is triggered - * by the late trigger condition - * - * @param onTriggerFunc the method to update the output value after the late trigger - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) { - this.lateTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; }; - return this; - } - - /** - * API method to allow users to add a system timer trigger based on timeout after the last message received in the window - * - * @param timeoutMs the timeout in ms after the last message received in the window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) { - this.timerTrigger = this.addTimerTrigger(this.timerTrigger, - s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); - return this; - } - - /** - * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window - * - * @param timeoutMs the timeout in ms after the first message received in the window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) { - this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s -> - TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); - return this; - } - - /** - * API method allow users to add a late trigger based on the window size limit - * - * @param sizeLimit limit on the number of messages in window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) { - this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit); - return this; - } - - /** - * API method to allow users to define a customized late trigger function based on input message and the window state - * - * @param lateTrigger the late trigger condition based on input {@link Message} and the current {@link WindowState} - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) { - this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger); - return this; - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit - * - * @param sizeLimit window size limit - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) { - return new TriggerBuilder<M, V>(sizeLimit); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window - * - * - * @param eventTimeFunc the function to get the event time from the input message - * @param eventTimeWndSizeMs the event time window size in Ms - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) { - return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages - * - * @param tokenFunc the function to determine whether an input message is a window token or not - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) { - return new TriggerBuilder<M, V>(tokenFunc); - } - - /** - * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState} - * - * @param earlyTrigger the user defined early trigger condition - * @param <M> the input message type - * @param <V> the output value from the window - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) { - TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>(); - newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger); - return newTriggers; - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window - * - * @param timeoutMs timeout in ms after the last message received - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) { - return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window - * - * @param timeoutMs timeout in ms after the first message received - * @param <M> the type of input {@link Message} - * @param <V> the type of {@link Windows.Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) { - return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java deleted file mode 100644 index 402cc42..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -/** - * This interface defines the methods a window state class has to implement. The programmers are allowed to implement - * customized window state to be stored in window state stores by implementing this interface class. - * - * @param <WV> the type for window output value - */ -public interface WindowState<WV> { - /** - * Method to get the system time when the first message in the window is received - * - * @return nano-second of system time for the first message received in the window - */ - long getFirstMessageTimeNs(); - - /** - * Method to get the system time when the last message in the window is received - * - * @return nano-second of system time for the last message received in the window - */ - long getLastMessageTimeNs(); - - /** - * Method to get the earliest event time in the window - * - * @return the earliest event time in nano-second in the window - */ - long getEarliestEventTimeNs(); - - /** - * Method to get the latest event time in the window - * - * @return the latest event time in nano-second in the window - */ - long getLatestEventTimeNs(); - - /** - * Method to get the total number of messages received in the window - * - * @return number of messages in the window - */ - long getNumberMessages(); - - /** - * Method to get the corresponding window's output value - * - * @return the corresponding window's output value - */ - WV getOutputValue(); - - /** - * Method to set the corresponding window's output value - * - * @param value the corresponding window's output value - */ - void setOutputValue(WV value); - -}