http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java deleted file mode 100644 index 14e6562..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.data.Message; - - -public class TestOutputMessage implements Message<String, Integer> { - private final String key; - private final Integer value; - private final long timestamp; - - public TestOutputMessage(String key, Integer value, long timestamp) { - this.key = key; - this.value = value; - this.timestamp = timestamp; - } - - @Override public Integer getMessage() { - return this.value; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } -} -
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java new file mode 100644 index 0000000..284b30b --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.MessageEnvelope; + + +public class TestOutputMessageEnvelope implements MessageEnvelope<String, Integer> { + private final String key; + private final Integer value; + + public TestOutputMessageEnvelope(String key, Integer value) { + this.key = key; + this.value = value; + } + + @Override + public Integer getMessage() { + return this.value; + } + + @Override + public String getKey() { + return this.key; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java deleted file mode 100644 index 927b14b..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.junit.Before; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestTriggerBuilder { - private Field earlyTriggerField; - private Field lateTriggerField; - private Field timerTriggerField; - private Field earlyTriggerUpdater; - private Field lateTriggerUpdater; - - @Before - public void testPrep() throws Exception { - this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); - this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); - this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); - this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); - this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); - - this.earlyTriggerField.setAccessible(true); - this.lateTriggerField.setAccessible(true); - this.timerTriggerField.setAccessible(true); - this.earlyTriggerUpdater.setAccessible(true); - this.lateTriggerUpdater.setAccessible(true); - } - - @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - when(mockState.getNumberMessages()).thenReturn(2000L); - assertTrue(triggerField.apply(null, mockState)); - - Function<TestMessage, Boolean> tokenFunc = m -> true; - builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - TestMessage m = mock(TestMessage.class); - assertTrue(triggerField.apply(m, mockState)); - - builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); - when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); - when(m.getTimestamp()).thenReturn(19999000000L); - assertFalse(triggerField.apply(m, mockState)); - when(m.getTimestamp()).thenReturn(32000000000L); - assertTrue(triggerField.apply(m, mockState)); - when(m.getTimestamp()).thenReturn(1001000000L); - when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); - assertTrue(triggerField.apply(m, mockState)); - - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class); - builder = TriggerBuilder.earlyTrigger(mockFunc); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - assertEquals(triggerField, mockFunc); - - builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); - Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = - (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getFirstMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - - builder = TriggerBuilder.timeoutSinceLastMessage(10000L); - timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getLastMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); - assertFalse(timerTrigger.apply(mockState)); - } - - @Test public void testAddTimerTriggers() throws IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addTimeoutSinceFirstMessage(10000L); - // exam that both earlyTrigger and timer triggers are set up - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - // check the timer trigger - Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = - (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getFirstMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - - // exam that both early trigger and timer triggers are set up - builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - builder.addTimeoutSinceLastMessage(20000L); - // check the timer trigger - timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getLastMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - } - - @Test public void testAddLateTriggers() throws IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addLateTriggerOnSizeLimit(10000L); - // exam that both earlyTrigger and lateTriggers are set up - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(earlyTrigger.apply(null, mockState)); - // check the late trigger - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); - assertFalse(lateTrigger.apply(null, mockState)); - // set the number of messages to 10001 to trigger the late trigger - when(mockState.getNumberMessages()).thenReturn(10001L); - assertTrue(lateTrigger.apply(null, mockState)); - - builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); - // exam that both earlyTrigger and lateTriggers are set up - earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(earlyTrigger.apply(null, mockState)); - // exam the lateTrigger - when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); - lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); - assertFalse(lateTrigger.apply(null, mockState)); - List<TestMessage> mockList = mock(ArrayList.class); - when(mockList.size()).thenReturn(200); - when(mockState.getOutputValue()).thenReturn(mockList); - assertTrue(lateTrigger.apply(null, mockState)); - } - - @Test public void testAddTriggerUpdater() throws IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.onEarlyTrigger(c -> { - c.clear(); - return c; - }); - List<TestMessage> collection = new ArrayList<TestMessage>() { { - for (int i = 0; i < 10; i++) { - this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime())); - } - } }; - // exam that earlyTriggerUpdater is set up - Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater = - (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getOutputValue()).thenReturn(collection); - earlyTriggerUpdater.apply(mockState); - assertTrue(collection.isEmpty()); - - collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime())); - collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime())); - builder.onLateTrigger(c -> { - c.removeIf(t -> t.getKey().equals("key-to-remove")); - return c; - }); - // check the late trigger updater - Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater = - (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder); - when(mockState.getOutputValue()).thenReturn(collection); - lateTriggerUpdater.apply(mockState); - assertTrue(collection.size() == 1); - assertFalse(collection.get(0).isDelete()); - assertEquals(collection.get(0).getKey(), "key-to-stay"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java deleted file mode 100644 index 8a25a96..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.Windows.Window; -import org.apache.samza.operators.internal.Trigger; -import org.apache.samza.operators.internal.WindowOutput; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestWindows { - - @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { - // test constructing the default session window - Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions( - TestMessage::getKey); - assertTrue(testWnd instanceof Windows.SessionWindow); - Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction"); - Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator"); - wndKeyFuncField.setAccessible(true); - aggregatorField.setAccessible(true); - Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd); - assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key"); - BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc = - (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd); - TestMessage mockMsg = mock(TestMessage.class); - Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); - assertTrue(collection.size() == 1); - assertTrue(collection.contains(mockMsg)); - - // test constructing the session window w/ customized session info - Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( - m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0)); - assertTrue(testWnd2 instanceof Windows.SessionWindow); - wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2); - aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2); - assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0"); - when(mockMsg.getMessage()).thenReturn("x-001"); - collection = aggrFunc.apply(mockMsg, new ArrayList<>()); - assertTrue(collection.size() == 1); - assertTrue(collection.contains('x')); - - // test constructing session window w/ a default counter - Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( - m -> String.format("key-%d", m.getTimestamp())); - assertTrue(testCounter instanceof Windows.SessionWindow); - wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter); - BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter); - when(mockMsg.getTimestamp()).thenReturn(12345L); - assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); - assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); - } - - @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { - Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( - m -> String.format("key-%d", m.getTimestamp())); - // test session window w/ a trigger - TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); - testCounter.setTriggers(triggerBuilder); - Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); - Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); - // examine all trigger fields are expected - Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); - Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); - Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); - Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); - Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); - earlyTriggerField.setAccessible(true); - lateTriggerField.setAccessible(true); - timerTriggerField.setAccessible(true); - earlyTriggerUpdater.setAccessible(true); - lateTriggerUpdater.setAccessible(true); - assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); - assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); - assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); - assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); - assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java index b734e87..7bd62a7 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java @@ -30,9 +30,10 @@ import static org.mockito.Mockito.when; public class TestIncomingSystemMessage { - @Test public void testConstructor() { + @Test + public void testConstructor() { IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class); - IncomingSystemMessage ism = new IncomingSystemMessage(ime); + IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime); Object mockKey = mock(Object.class); Object mockValue = mock(Object.class); http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java index 943c47f..7838896 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java @@ -28,7 +28,8 @@ import static org.mockito.Mockito.mock; public class TestLongOffset { - @Test public void testConstructor() throws Exception { + @Test + public void testConstructor() throws Exception { LongOffset o1 = new LongOffset("12345"); Field offsetField = LongOffset.class.getDeclaredField("offset"); offsetField.setAccessible(true); @@ -47,7 +48,8 @@ public class TestLongOffset { } } - @Test public void testComparator() { + @Test + public void testComparator() { LongOffset o1 = new LongOffset("11111"); Offset other = mock(Offset.class); try { @@ -65,7 +67,8 @@ public class TestLongOffset { assertEquals(o1.compareTo(o4), 0); } - @Test public void testEquals() { + @Test + public void testEquals() { LongOffset o1 = new LongOffset("12345"); Offset other = mock(Offset.class); assertFalse(o1.equals(other)); http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java deleted file mode 100644 index d994486..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.internal; - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestOperators { - - private class TestMessage implements Message<String, Object> { - private final long timestamp; - private final String key; - private final Object msg; - - - TestMessage(String key, Object msg, long timestamp) { - this.timestamp = timestamp; - this.key = key; - this.msg = msg; - } - - @Override public Object getMessage() { - return this.msg; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } - } - - @Test public void testGetStreamOperator() { - Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { { - this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L)); - } }; - Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn); - assertEquals(strmOp.getFunction(), transformFn); - assertTrue(strmOp.getOutputStream() instanceof MessageStream); - } - - @Test public void testGetSinkOperator() { - MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { }; - Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn); - assertEquals(sinkOp.getFunction(), sinkFn); - assertTrue(sinkOp.getOutputStream() == null); - } - - @Test public void testGetWindowOperator() { - WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class); - BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null; - Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class); - Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class); - MessageStream<TestMessage> mockInput = mock(MessageStream.class); - when(windowFn.getTransformFunc()).thenReturn(xFunction); - when(windowFn.getStoreFuncs()).thenReturn(storeFns); - when(windowFn.getTrigger()).thenReturn(trigger); - when(mockInput.toString()).thenReturn("mockStream1"); - - Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn); - assertEquals(windowOp.getFunction(), xFunction); - assertEquals(windowOp.getStoreFunctions(), storeFns); - assertEquals(windowOp.getTrigger(), trigger); - assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString())); - } - - @Test public void testGetPartialJoinOperator() { - BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger = - (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(), - Math.max(m1.getTimestamp(), m2.getTimestamp())); - MessageStream<TestMessage> joinOutput = new MessageStream<>(); - Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin = - Operators.getPartialJoinOperator(merger, joinOutput); - - assertEquals(partialJoin.getOutputStream(), joinOutput); - Message<Object, Object> m = mock(Message.class); - Message<Object, Object> s = mock(Message.class); - assertEquals(partialJoin.getFunction(), merger); - assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); - assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m); - assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); - assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater()); - } - - @Test public void testGetMergeOperator() { - MessageStream<TestMessage> output = new MessageStream<>(); - Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output); - Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { { - this.add(t); - } }; - TestMessage t = mock(TestMessage.class); - assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t)); - assertEquals(mergeOp.getOutputStream(), output); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java deleted file mode 100644 index 0f35a7c..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.internal; - -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; - - -public class TestTrigger { - - @Test public void testConstructor() throws Exception { - BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; - BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; - Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); - Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { - s.setOutputValue(0); - return s; - }; - Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { - s.setOutputValue(1); - return s; - }; - - Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, - earlyTriggerUpdater, lateTriggerUpdater); - - Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); - Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); - Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); - Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); - Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); - earlyTriggerField.setAccessible(true); - lateTriggerField.setAccessible(true); - timerTriggerField.setAccessible(true); - earlyTriggerUpdaterField.setAccessible(true); - lateTriggerUpdaterField.setAccessible(true); - - assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); - assertEquals(timerTrigger, timerTriggerField.get(trigger)); - assertEquals(lateTrigger, lateTriggerField.get(trigger)); - assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); - assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java deleted file mode 100644 index 268c9fc..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.internal; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - - -public class TestWindowOutput { - - @Test public void testConstructor() { - WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); - assertEquals(wndOutput.getKey(), "testMsg"); - assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); - assertFalse(wndOutput.isDelete()); - assertEquals(wndOutput.getTimestamp(), 0); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java new file mode 100644 index 0000000..a91af24 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + + +public class TestTrigger { + + @Test + public void testConstructor() throws Exception { + BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; + BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; + Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); + Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { + s.setOutputValue(0); + return s; + }; + Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { + s.setOutputValue(1); + return s; + }; + + Trigger<MessageEnvelope<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, + earlyTriggerUpdater, lateTriggerUpdater); + + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdaterField.setAccessible(true); + lateTriggerUpdaterField.setAccessible(true); + + assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); + assertEquals(timerTrigger, timerTriggerField.get(trigger)); + assertEquals(lateTrigger, lateTriggerField.get(trigger)); + assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); + assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java new file mode 100644 index 0000000..6a9b55d --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.TestMessageEnvelope; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestTriggerBuilder { + private Field earlyTriggerField; + private Field lateTriggerField; + private Field timerTriggerField; + private Field earlyTriggerUpdater; + private Field lateTriggerUpdater; + + @Before + public void testPrep() throws Exception { + this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); + this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); + this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); + this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); + this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); + + this.earlyTriggerField.setAccessible(true); + this.lateTriggerField.setAccessible(true); + this.timerTriggerField.setAccessible(true); + this.earlyTriggerUpdater.setAccessible(true); + this.lateTriggerUpdater.setAccessible(true); + } + + @Test + public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { + TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField = + (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + when(mockState.getNumberMessages()).thenReturn(2000L); + assertTrue(triggerField.apply(null, mockState)); + + Function<TestMessageEnvelope, Boolean> tokenFunc = m -> true; + builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); + triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + TestMessageEnvelope m = mock(TestMessageEnvelope.class); + assertTrue(triggerField.apply(m, mockState)); + + builder = TriggerBuilder.earlyTriggerOnEventTime(tm -> tm.getMessage().getEventTime(), 30000L); + triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); + TestMessageEnvelope.MessageType mockInnerMessage; + mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); + when(mockInnerMessage.getEventTime()).thenReturn(19999000000L); + when(m.getMessage()).thenReturn(mockInnerMessage); + assertFalse(triggerField.apply(m, mockState)); + mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); + when(mockInnerMessage.getEventTime()).thenReturn(32000000000L); + when(m.getMessage()).thenReturn(mockInnerMessage); + assertTrue(triggerField.apply(m, mockState)); + mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); + when(m.getMessage()).thenReturn(mockInnerMessage); + when(mockInnerMessage.getEventTime()).thenReturn(1001000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + + BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> mockFunc = mock(BiFunction.class); + builder = TriggerBuilder.earlyTrigger(mockFunc); + triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + assertEquals(triggerField, mockFunc); + + builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); + Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + builder = TriggerBuilder.timeoutSinceLastMessage(10000L); + timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test + public void testAddTimerTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addTimeoutSinceFirstMessage(10000L); + // exam that both earlyTrigger and timer triggers are set up + BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField = + (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + // check the timer trigger + Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + // exam that both early trigger and timer triggers are set up + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + builder.addTimeoutSinceLastMessage(20000L); + // check the timer trigger + timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test + public void testAddLateTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTriggerOnSizeLimit(10000L); + // exam that both earlyTrigger and lateTriggers are set up + BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> earlyTrigger = + (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // check the late trigger + BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> lateTrigger = + (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + // set the number of messages to 10001 to trigger the late trigger + when(mockState.getNumberMessages()).thenReturn(10001L); + assertTrue(lateTrigger.apply(null, mockState)); + + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); + // exam that both earlyTrigger and lateTriggers are set up + earlyTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // exam the lateTrigger + when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); + lateTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + List<TestMessageEnvelope> mockList = mock(ArrayList.class); + when(mockList.size()).thenReturn(200); + when(mockState.getOutputValue()).thenReturn(mockList); + assertTrue(lateTrigger.apply(null, mockState)); + } + + @Test + public void testAddTriggerUpdater() throws IllegalAccessException { + TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.onEarlyTrigger(c -> { + c.clear(); + return c; + }); + List<TestMessageEnvelope> collection = new ArrayList<TestMessageEnvelope>() { { + for (int i = 0; i < 10; i++) { + this.add(new TestMessageEnvelope(String.format("key-%d", i), "string-value", System.nanoTime())); + } + } }; + // exam that earlyTriggerUpdater is set up + Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> earlyTriggerUpdater = + (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.earlyTriggerUpdater.get(builder); + WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); + when(mockState.getOutputValue()).thenReturn(collection); + earlyTriggerUpdater.apply(mockState); + assertTrue(collection.isEmpty()); + + collection.add(new TestMessageEnvelope("key-to-stay", "string-to-stay", System.nanoTime())); + collection.add(new TestMessageEnvelope("key-to-remove", "string-to-remove", System.nanoTime())); + builder.onLateTrigger(c -> { + c.removeIf(t -> t.getKey().equals("key-to-remove")); + return c; + }); + // check the late trigger updater + Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> lateTriggerUpdater = + (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.lateTriggerUpdater.get(builder); + when(mockState.getOutputValue()).thenReturn(collection); + lateTriggerUpdater.apply(mockState); + assertTrue(collection.size() == 1); + assertFalse(collection.get(0).isDelete()); + assertEquals(collection.get(0).getKey(), "key-to-stay"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java new file mode 100644 index 0000000..7f81fc9 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class TestWindowOutput { + + @Test + public void testConstructor() { + WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); + assertEquals(wndOutput.getKey(), "testMsg"); + assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); + assertFalse(wndOutput.isDelete()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java new file mode 100644 index 0000000..26af26e --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.TestMessageEnvelope; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestWindows { + + @Test + public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { + // test constructing the default session window + Window<TestMessageEnvelope, String, Collection<TestMessageEnvelope>, WindowOutput<String, Collection<TestMessageEnvelope>>> testWnd = Windows + .intoSessions( + TestMessageEnvelope::getKey); + assertTrue(testWnd instanceof SessionWindow); + Field wndKeyFuncField = SessionWindow.class.getDeclaredField("wndKeyFunction"); + Field aggregatorField = SessionWindow.class.getDeclaredField("aggregator"); + wndKeyFuncField.setAccessible(true); + aggregatorField.setAccessible(true); + Function<TestMessageEnvelope, String> wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd); + assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "test-key"); + BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>> aggrFunc = + (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd); + TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); + Collection<TestMessageEnvelope> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains(mockMsg)); + + // test constructing the session window w/ customized session info + Window<TestMessageEnvelope, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( + m -> String.format("key-%d", m.getMessage().getEventTime()), m -> m.getMessage().getValue().charAt(0)); + assertTrue(testWnd2 instanceof SessionWindow); + wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd2); + aggrFunc = (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd2); + assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "key-0"); + TestMessageEnvelope.MessageType mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); + when(mockMsg.getMessage()).thenReturn(mockInnerMessage); + when(mockInnerMessage.getValue()).thenReturn("x-001"); + collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains('x')); + + // test constructing session window w/ a default counter + Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getMessage().getEventTime())); + assertTrue(testCounter instanceof SessionWindow); + wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testCounter); + BiFunction<TestMessageEnvelope, Integer, Integer> counterFn = (BiFunction<TestMessageEnvelope, Integer, Integer>) aggregatorField.get(testCounter); + when(mockMsg.getMessage().getEventTime()).thenReturn(12345L); + assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); + assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); + } + + @Test + public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { + Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getMessage().getEventTime())); + // test session window w/ a trigger + TriggerBuilder<TestMessageEnvelope, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); + testCounter.setTriggers(triggerBuilder); + Trigger<TestMessageEnvelope, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); + Trigger<TestMessageEnvelope, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); + // examine all trigger fields are expected + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdater.setAccessible(true); + lateTriggerUpdater.setAccessible(true); + assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); + assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); + assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); + assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); + assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java new file mode 100644 index 0000000..231d3f5 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OperatorSpecs; +import org.apache.samza.operators.windows.Window; +import org.apache.samza.operators.windows.WindowFn; +import org.apache.samza.operators.windows.WindowOutput; +import org.apache.samza.operators.windows.WindowState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; + + +/** + * The implementation for input/output {@link MessageStream}s to/from the operators. + * Users use the {@link MessageStream} API methods to describe and chain the operators specs. + * + * @param <M> type of {@link MessageEnvelope}s in this {@link MessageStream} + */ +public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStream<M> { + + /** + * The set of operators that consume the {@link MessageEnvelope}s in this {@link MessageStream} + */ + private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>(); + + @Override + public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) { + OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperator(m -> new ArrayList<OM>() { { + OM r = mapFn.apply(m); + if (r != null) { + this.add(r); + } + } }); + this.registeredOperatorSpecs.add(op); + return op.getOutputStream(); + } + + @Override + public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) { + OperatorSpec<OM> op = OperatorSpecs.createStreamOperator(flatMapFn); + this.registeredOperatorSpecs.add(op); + return op.getOutputStream(); + } + + @Override + public MessageStream<M> filter(FilterFunction<M> filterFn) { + OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperator(t -> new ArrayList<M>() { { + if (filterFn.apply(t)) { + this.add(t); + } + } }); + this.registeredOperatorSpecs.add(op); + return op.getOutputStream(); + } + + @Override + public void sink(SinkFunction<M> sinkFn) { + this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperator(sinkFn)); + } + + @Override + public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window( + Window<M, WK, WV, WM> window) { + OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperator((WindowFn<M, WK, WS, WM>) window.getInternalWindowFn()); + this.registeredOperatorSpecs.add(wndOp); + return wndOp.getOutputStream(); + } + + @Override + public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join( + MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) { + MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(); + + BiFunction<M, JM, RM> parJoin1 = joinFn::apply; + BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m); + + // TODO: need to add default store functions for the two partial join functions + + ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add( + OperatorSpecs.createPartialJoinOperator(parJoin2, outputStream)); + this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperator(parJoin1, outputStream)); + return outputStream; + } + + @Override + public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) { + MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(); + + otherStreams.add(this); + otherStreams.forEach(other -> + ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperator(outputStream))); + return outputStream; + } + + /** + * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and + * should not be exposed to users. + * + * @return a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}. + */ + public Collection<OperatorSpec> getRegisteredOperatorSpecs() { + return Collections.unmodifiableSet(this.registeredOperatorSpecs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java new file mode 100644 index 0000000..2572f14 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.windows.StoreFunctions; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; + + +/** + * The base class for all state stores + */ +public class StateStoreImpl<M extends MessageEnvelope, SK, SS> { + private final String storeName; + private final StoreFunctions<M, SK, SS> storeFunctions; + private KeyValueStore<SK, SS> kvStore = null; + + public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) { + this.storeFunctions = store; + this.storeName = storeName; + } + + public void init(TaskContext context) { + this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName); + } + + public Entry<SK, SS> getState(M m) { + SK key = this.storeFunctions.getStoreKeyFn().apply(m); + SS state = this.kvStore.get(key); + return new Entry<>(key, state); + } + + public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) { + SS newValue = this.storeFunctions.getStateUpdaterFn().apply(m, oldEntry.getValue()); + this.kvStore.put(oldEntry.getKey(), newValue); + return new Entry<>(oldEntry.getKey(), newValue); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java new file mode 100644 index 0000000..152cd92 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.operators.impl.OperatorImpls; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.WindowableTask; + +import java.util.HashMap; +import java.util.Map; + + +/** + * An {@link StreamTask} implementation that receives {@link IncomingSystemMessageEnvelope}s and propagates them + * through the user's stream transformations defined in {@link StreamOperatorTask#transform(Map)} using the + * {@link MessageStream} APIs. + * <p> + * This class brings all the operator API implementation components together and feeds the + * {@link IncomingSystemMessageEnvelope}s into the transformation chains. + * <p> + * It accepts an instance of the user implemented {@link StreamOperatorTask}. When its own {@link #init(Config, TaskContext)} + * method is called during startup, it creates a {@link MessageStreamImpl} corresponding to each of its input + * {@link SystemStreamPartition}s and then calls the user's {@link StreamOperatorTask#transform(Map)} method. + * <p> + * When users invoke the methods on the {@link MessageStream} API to describe their stream transformations in the + * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link MessageStreamImpl} creates the + * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to record information about the desired + * transformation, and returns the output {@link MessageStream} to allow further transform chaining. + * <p> + * Once the user's transformation DAGs have been described for all {@link MessageStream}s (i.e., when the + * {@link StreamOperatorTask#transform(Map)} call returns), it calls + * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} for each of the input + * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG + * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the + * root node of the DAG, which this class saves. + * <p> + * Now that it has the root for the DAG corresponding to each {@link SystemStreamPartition}, it can pass the message + * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} along + * to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates + * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s. + */ +public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask { + + /** + * A mapping from each {@link SystemStreamPartition} to the root node of its operator chain DAG. + */ + private final Map<SystemStreamPartition, OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> operatorChains = new HashMap<>(); + + private final StreamOperatorTask userTask; + + public StreamOperatorAdaptorTask(StreamOperatorTask userTask) { + this.userTask = userTask; + } + + @Override + public final void init(Config config, TaskContext context) throws Exception { + if (this.userTask instanceof InitableTask) { + ((InitableTask) this.userTask).init(config, context); + } + Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams = new HashMap<>(); + context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, new MessageStreamImpl<>())); + this.userTask.transform(messageStreams); + messageStreams.forEach((ssp, ms) -> + operatorChains.put(ssp, OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>) ms, context))); + } + + @Override + public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { + this.operatorChains.get(ime.getSystemStreamPartition()) + .onNext(new IncomingSystemMessageEnvelope(ime), collector, coordinator); + } + + @Override + public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + if (this.userTask instanceof WindowableTask) { + ((WindowableTask) this.userTask).window(collector, coordinator); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java deleted file mode 100644 index 1eee2dc..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl; - -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators.Operator; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - - -/** - * Implementation class for a chain of operators from the single input {@code source} - * - * @param <M> type of message in the input stream {@code source} - */ -public class ChainedOperators<M extends Message> { - - private final Set<OperatorImpl> subscribers = new HashSet<>(); - - /** - * Private constructor - * - * @param source the input source {@link MessageStream} - * @param context the {@link TaskContext} object that we need to instantiate the state stores - */ - private ChainedOperators(MessageStream<M> source, TaskContext context) { - // create the pipeline/topology starting from source - source.getSubscribers().forEach(sub -> { - // pass in the context s.t. stateful stream operators can initialize their stores - OperatorImpl subImpl = this.createAndSubscribe(sub, source, context); - this.subscribers.add(subImpl); - }); - } - - /** - * Private function to recursively instantiate the implementation of operators and the chains - * - * @param operator the operator that subscribe to {@code source} - * @param source the source {@link MessageStream} - * @param context the context of the task - * @return the implementation object of the corresponding {@code operator} - */ - private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source, - TaskContext context) { - Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator); - if (factoryEntry.getValue()) { - // The operator has already been instantiated and we do not need to traverse and create the subscribers any more. - return factoryEntry.getKey(); - } - OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey(); - MessageStream outStream = operator.getOutputStream(); - Collection<Operator> subs = outStream.getSubscribers(); - subs.forEach(sub -> { - OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context); - opImpl.subscribe(subImpl); - }); - // initialize the operator's state store - opImpl.init(source, context); - return opImpl; - } - - /** - * Static method to create a {@link ChainedOperators} from the {@code source} stream - * - * @param source the input source {@link MessageStream} - * @param context the {@link TaskContext} object used to initialize the {@link StateStoreImpl} - * @param <M> the type of input {@link Message} - * @return a {@link ChainedOperators} object takes the {@code source} as input - */ - public static <M extends Message> ChainedOperators create(MessageStream<M> source, TaskContext context) { - return new ChainedOperators<>(source, context); - } - - /** - * Method to navigate the incoming {@code message} through the processing chains - * - * @param message the incoming message to this {@link ChainedOperators} - * @param collector the {@link MessageCollector} object within the process context - * @param coordinator the {@link TaskCoordinator} object within the process context - */ - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator)); - } - - /** - * Method to handle timer events - * - * @param collector the {@link MessageCollector} object within the process context - * @param coordinator the {@link TaskCoordinator} object within the process context - */ - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - long nanoTime = System.nanoTime(); - this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java deleted file mode 100644 index ea90878..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.internal.Operators.*; -import org.apache.samza.operators.internal.WindowOutput; -import org.apache.samza.operators.impl.join.PartialJoinOpImpl; -import org.apache.samza.operators.impl.window.SessionWindowImpl; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * The factory class that instantiates all implementation of {@link OperatorImpl} classes. - */ -public class OperatorFactory { - - /** - * the static operatorMap that includes all operator implementation instances - */ - private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> OPERATOR_MAP = new ConcurrentHashMap<>(); - - /** - * The method to actually create the implementation instances of operators - * - * @param operator the immutable definition of {@link Operator} - * @param <M> type of input {@link Message} - * @param <RM> type of output {@link Message} - * @return the implementation object of {@link OperatorImpl} - */ - private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) { - if (operator instanceof StreamOperator) { - return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator); - } else if (operator instanceof SinkOperator) { - return new SinkOperatorImpl<>((SinkOperator<M>) operator); - } else if (operator instanceof WindowOperator) { - return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator); - } else if (operator instanceof PartialJoinOperator) { - return new PartialJoinOpImpl<>((PartialJoinOperator) operator); - } - throw new IllegalArgumentException( - String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName())); - } - - /** - * The method to get the unique implementation instance of {@link Operator} - * - * @param operator the {@link Operator} to instantiate - * @param <M> type of input {@link Message} - * @param <RM> type of output {@link Message} - * @return A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether - * the operator instance has already been created or not. True means the operator instance has already created, false means the operator - * was not created. - */ - public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) { - if (!OPERATOR_MAP.containsKey(operator)) { - OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator); - if (OPERATOR_MAP.putIfAbsent(operator, operatorImpl) == null) { - return new Entry<OperatorImpl<M, ? extends Message>, Boolean>(operatorImpl, false) { }; - } - } - return new Entry<OperatorImpl<M, ? extends Message>, Boolean>((OperatorImpl<M, ? extends Message>) OPERATOR_MAP.get(operator), true) { }; - } - -}
