http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java new file mode 100644 index 0000000..47a37dc --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api; + +import org.apache.samza.operators.api.Windows.Window; +import org.apache.samza.operators.api.internal.Trigger; +import org.apache.samza.operators.api.internal.WindowOutput; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestWindows { + + @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { + // test constructing the default session window + Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions( + TestMessage::getKey); + assertTrue(testWnd instanceof Windows.SessionWindow); + Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction"); + Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator"); + wndKeyFuncField.setAccessible(true); + aggregatorField.setAccessible(true); + Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd); + assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key"); + BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc = + (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd); + TestMessage mockMsg = mock(TestMessage.class); + Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains(mockMsg)); + + // test constructing the session window w/ customized session info + Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( + m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0)); + assertTrue(testWnd2 instanceof Windows.SessionWindow); + wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2); + aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2); + assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0"); + when(mockMsg.getMessage()).thenReturn("x-001"); + collection = aggrFunc.apply(mockMsg, new ArrayList<>()); + assertTrue(collection.size() == 1); + assertTrue(collection.contains('x')); + + // test constructing session window w/ a default counter + Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getTimestamp())); + assertTrue(testCounter instanceof Windows.SessionWindow); + wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter); + BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter); + when(mockMsg.getTimestamp()).thenReturn(12345L); + assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); + assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); + } + + @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { + Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( + m -> String.format("key-%d", m.getTimestamp())); + // test session window w/ a trigger + TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); + testCounter.setTriggers(triggerBuilder); + Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); + Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); + // examine all trigger fields are expected + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdater.setAccessible(true); + lateTriggerUpdater.setAccessible(true); + assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); + assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); + assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); + assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); + assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java new file mode 100644 index 0000000..e953078 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api.data; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestIncomingSystemMessage { + + @Test public void testConstructor() { + IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class); + IncomingSystemMessage ism = new IncomingSystemMessage(ime); + + Object mockKey = mock(Object.class); + Object mockValue = mock(Object.class); + LongOffset testOffset = new LongOffset("12345"); + SystemStreamPartition mockSsp = mock(SystemStreamPartition.class); + + when(ime.getKey()).thenReturn(mockKey); + when(ime.getMessage()).thenReturn(mockValue); + when(ime.getSystemStreamPartition()).thenReturn(mockSsp); + when(ime.getOffset()).thenReturn("12345"); + + assertEquals(ism.getKey(), mockKey); + assertEquals(ism.getMessage(), mockValue); + assertEquals(ism.getSystemStreamPartition(), mockSsp); + assertEquals(ism.getOffset(), testOffset); + assertFalse(ism.isDelete()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java new file mode 100644 index 0000000..10775ec --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api.data; + +import org.junit.Test; + +import java.lang.reflect.Field; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + + +public class TestLongOffset { + + @Test public void testConstructor() throws Exception { + LongOffset o1 = new LongOffset("12345"); + Field offsetField = LongOffset.class.getDeclaredField("offset"); + offsetField.setAccessible(true); + Long x = (Long) offsetField.get(o1); + assertEquals(x.longValue(), 12345L); + + o1 = new LongOffset("012345"); + x = (Long) offsetField.get(o1); + assertEquals(x.longValue(), 12345L); + + try { + o1 = new LongOffset("xyz"); + fail("Constructor of LongOffset should have failed w/ mal-formatted numbers"); + } catch (NumberFormatException nfe) { + // expected + } + } + + @Test public void testComparator() { + LongOffset o1 = new LongOffset("11111"); + Offset other = mock(Offset.class); + try { + o1.compareTo(other); + fail("compareTo() should have have failed when comparing to an object of a different class"); + } catch (IllegalArgumentException iae) { + // expected + } + + LongOffset o2 = new LongOffset("-10000"); + assertEquals(o1.compareTo(o2), 1); + LongOffset o3 = new LongOffset("22222"); + assertEquals(o1.compareTo(o3), -1); + LongOffset o4 = new LongOffset("11111"); + assertEquals(o1.compareTo(o4), 0); + } + + @Test public void testEquals() { + LongOffset o1 = new LongOffset("12345"); + Offset other = mock(Offset.class); + assertFalse(o1.equals(other)); + + LongOffset o2 = new LongOffset("0012345"); + assertTrue(o1.equals(o2)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java new file mode 100644 index 0000000..65c37e9 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api.internal; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.WindowState; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestOperators { + + private class TestMessage implements Message<String, Object> { + private final long timestamp; + private final String key; + private final Object msg; + + + TestMessage(String key, Object msg, long timestamp) { + this.timestamp = timestamp; + this.key = key; + this.msg = msg; + } + + @Override public Object getMessage() { + return this.msg; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } + } + + @Test public void testGetStreamOperator() { + Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() {{ + this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L)); + }}; + Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn); + assertEquals(strmOp.getFunction(), transformFn); + assertTrue(strmOp.getOutputStream() instanceof MessageStream); + } + + @Test public void testGetSinkOperator() { + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> {}; + Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn); + assertEquals(sinkOp.getFunction(), sinkFn); + assertTrue(sinkOp.getOutputStream() == null); + } + + @Test public void testGetWindowOperator() { + WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null; + Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class); + Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class); + MessageStream<TestMessage> mockInput = mock(MessageStream.class); + when(windowFn.getTransformFunc()).thenReturn(xFunction); + when(windowFn.getStoreFuncs()).thenReturn(storeFns); + when(windowFn.getTrigger()).thenReturn(trigger); + when(mockInput.toString()).thenReturn("mockStream1"); + + Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn); + assertEquals(windowOp.getFunction(), xFunction); + assertEquals(windowOp.getStoreFunctions(), storeFns); + assertEquals(windowOp.getTrigger(), trigger); + assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString())); + } + + @Test public void testGetPartialJoinOperator() { + BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger = + (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(), + Math.max(m1.getTimestamp(), m2.getTimestamp())); + MessageStream<TestMessage> joinOutput = new MessageStream<>(); + Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin = + Operators.getPartialJoinOperator(merger, joinOutput); + + assertEquals(partialJoin.getOutputStream(), joinOutput); + Message<Object, Object> m = mock(Message.class); + Message<Object, Object> s = mock(Message.class); + assertEquals(partialJoin.getFunction(), merger); + assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); + assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m); + assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); + assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater()); + } + + @Test public void testGetMergeOperator() { + MessageStream<TestMessage> output = new MessageStream<>(); + + Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output); + Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() {{ + this.add(t); + }}; + TestMessage t = mock(TestMessage.class); + assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t)); + assertEquals(mergeOp.getOutputStream(), output); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java new file mode 100644 index 0000000..727276a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api.internal; + +import org.apache.samza.operators.api.WindowState; +import org.apache.samza.operators.api.data.Message; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + + +public class TestTrigger { + + @Test public void testConstructor() throws Exception { + BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; + BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; + Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); + Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { s.setOutputValue(0); return s; }; + Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { s.setOutputValue(1); return s; }; + + Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, + earlyTriggerUpdater, lateTriggerUpdater); + + Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); + Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); + Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); + Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); + Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); + earlyTriggerField.setAccessible(true); + lateTriggerField.setAccessible(true); + timerTriggerField.setAccessible(true); + earlyTriggerUpdaterField.setAccessible(true); + lateTriggerUpdaterField.setAccessible(true); + + assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); + assertEquals(timerTrigger, timerTriggerField.get(trigger)); + assertEquals(lateTrigger, lateTriggerField.get(trigger)); + assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); + assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java new file mode 100644 index 0000000..f3cf0e0 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api.internal; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class TestWindowOutput { + + @Test public void testConstructor() { + WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); + assertEquals(wndOutput.getKey(), "testMsg"); + assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); + assertFalse(wndOutput.isDelete()); + assertEquals(wndOutput.getTimestamp(), 0); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java new file mode 100644 index 0000000..9445f3a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.reactivestreams.Subscriber; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + + +public class TestOperatorImpl { + + TestMessage curInputMsg; + MessageCollector curCollector; + TaskCoordinator curCoordinator; + + @Test public void testSubscribers() { + this.curInputMsg = null; + this.curCollector = null; + this.curCoordinator = null; + OperatorImpl<TestMessage, TestOutputMessage> opImpl = new OperatorImpl<TestMessage, TestOutputMessage>() { + @Override protected void onNext(TestMessage message, MessageCollector collector, TaskCoordinator coordinator) { + TestOperatorImpl.this.curInputMsg = message; + TestOperatorImpl.this.curCollector = collector; + TestOperatorImpl.this.curCoordinator = coordinator; + } + }; + // verify subscribe() added the mockSub and nextProcessors() invoked the mockSub.onNext() + Subscriber<ProcessorContext<TestOutputMessage>> mockSub = mock(Subscriber.class); + opImpl.subscribe(mockSub); + TestOutputMessage xOutput = mock(TestOutputMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.nextProcessors(xOutput, mockCollector, mockCoordinator); + verify(mockSub, times(1)).onNext(argThat(new ArgumentMatcher<ProcessorContext<TestOutputMessage>>() { + @Override public boolean matches(Object argument) { + ProcessorContext<TestOutputMessage> pCntx = (ProcessorContext<TestOutputMessage>) argument; + return pCntx.getMessage().equals(xOutput) && pCntx.getCoordinator().equals(mockCoordinator) && pCntx.getCollector().equals(mockCollector); + } + })); + // verify onNext() is invoked correctly + TestMessage mockInput = mock(TestMessage.class); + ProcessorContext<TestMessage> inCntx = new ProcessorContext<>(mockInput, mockCollector, mockCoordinator); + opImpl.onNext(inCntx); + assertEquals(mockInput, this.curInputMsg); + assertEquals(mockCollector, this.curCollector); + assertEquals(mockCoordinator, this.curCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java new file mode 100644 index 0000000..4bcf767 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.data.Message; + + +class TestOutputMessage implements Message<String, Integer> { + private final String key; + private final Integer value; + private final long timestamp; + + TestOutputMessage(String key, Integer value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public Integer getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java new file mode 100644 index 0000000..14796fc --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + + +public class TestProcessorContext { + @Test public void testConstructor() { + TestMessage mockMsg = mock(TestMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class); + ProcessorContext<TestMessage> pCntx = new ProcessorContext<>(mockMsg, mockCollector, mockTaskCoordinator); + assertEquals(pCntx.getMessage(), mockMsg); + assertEquals(pCntx.getCollector(), mockCollector); + assertEquals(pCntx.getCoordinator(), mockTaskCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java new file mode 100644 index 0000000..50154f0 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.internal.Operators.StreamOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Function; + +import static org.mockito.Mockito.*; + + +public class TestSimpleOperatorImpl { + + @Test public void testSimpleOperator() { + StreamOperator<TestMessage, TestOutputMessage> mockOp = mock(StreamOperator.class); + Function<TestMessage, Collection<TestOutputMessage>> txfmFn = mock(Function.class); + when(mockOp.getFunction()).thenReturn(txfmFn); + + SimpleOperatorImpl<TestMessage, TestOutputMessage> opImpl = spy(new SimpleOperatorImpl<>(mockOp)); + TestMessage inMsg = mock(TestMessage.class); + TestOutputMessage outMsg = mock(TestOutputMessage.class); + Collection<TestOutputMessage> mockOutputs = new ArrayList() {{ + this.add(outMsg); + }}; + when(txfmFn.apply(inMsg)).thenReturn(mockOutputs); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onNext(inMsg, mockCollector, mockCoordinator); + verify(txfmFn, times(1)).apply(inMsg); + verify(opImpl, times(1)).nextProcessors(outMsg, mockCollector, mockCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java new file mode 100644 index 0000000..eb8a23a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.internal.Operators.SinkOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestSinkOperatorImpl { + + @Test public void testSinkOperator() { + SinkOperator<TestOutputMessage> sinkOp = mock(SinkOperator.class); + MessageStream.VoidFunction3<TestOutputMessage, MessageCollector, TaskCoordinator> sinkFn = mock( + MessageStream.VoidFunction3.class); + when(sinkOp.getFunction()).thenReturn(sinkFn); + SinkOperatorImpl<TestOutputMessage> sinkImpl = new SinkOperatorImpl<>(sinkOp); + TestOutputMessage mockMsg = mock(TestOutputMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + + sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator); + verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java new file mode 100644 index 0000000..eb8937a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.WindowState; +import org.apache.samza.operators.api.internal.Operators.StoreFunctions; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + + +public class TestStateStoreImpl { + @Test public void testStateStoreImpl() { + StoreFunctions<TestMessage, String, WindowState> mockStoreFunctions = mock(StoreFunctions.class); + // test constructor + StateStoreImpl<TestMessage, String, WindowState> storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName"); + TaskContext mockContext = mock(TaskContext.class); + KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class); + when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore); + // test init() + storeImpl.init(mockContext); + verify(mockContext, times(1)).getStore("myStoreName"); + Function<TestMessage, String> wndKeyFn = mock(Function.class); + when(mockStoreFunctions.getStoreKeyFinder()).thenReturn(wndKeyFn); + TestMessage mockMsg = mock(TestMessage.class); + when(wndKeyFn.apply(mockMsg)).thenReturn("myKey"); + WindowState mockState = mock(WindowState.class); + when(mockKvStore.get("myKey")).thenReturn(mockState); + // test getState() + Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg); + assertEquals(storeEntry.getKey(), "myKey"); + assertEquals(storeEntry.getValue(), mockState); + verify(wndKeyFn, times(1)).apply(mockMsg); + verify(mockKvStore, times(1)).get("myKey"); + Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState); + WindowState mockNewState = mock(WindowState.class); + BiFunction<TestMessage, WindowState, WindowState> mockUpdaterFn = mock(BiFunction.class); + when(mockStoreFunctions.getStateUpdater()).thenReturn(mockUpdaterFn); + when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState); + // test updateState() + Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, oldEntry); + assertEquals(newEntry.getKey(), "myKey"); + assertEquals(newEntry.getValue(), mockNewState); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java new file mode 100644 index 0000000..10ee2c7 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.data.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.serializers.Serde; +import org.apache.samza.operators.impl.data.avro.AvroData; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class SqlAvroSerdeTest { + public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.operators\",\n"+ + " \"type\": \"record\",\n"+ + " \"name\": \"Order\",\n"+ + " \"fields\": [\n"+ + " {\"name\": \"id\", \"type\": \"int\"},\n"+ + " {\"name\": \"product\", \"type\": \"string\"},\n"+ + " {\"name\": \"quantity\", \"type\": \"int\"}\n"+ + " ]\n"+ + "}"; + + public static Schema orderSchema = Schema.parse(ORDER_SCHEMA); + + private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig()); + + @Test + public void testSqlAvroSerdeDeserialization() throws IOException { + AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); + + Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT); + Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING); + } + + @Test + public void testSqlAvroSerialization() throws IOException { + AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); + byte[] encodedDatum = serde.toBytes(decodedDatumOriginal); + + AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum); + + Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT); + Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING); + } + + private static Config sqlAvroSerdeTestConfig(){ + Map<String, String> config = new HashMap<String, String>(); + config.put("serializers.sqlAvro.schema", ORDER_SCHEMA); + + return new MapConfig(config); + } + + private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException { + DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + BinaryEncoder encoder = new BinaryEncoder(output); + writer.write(datum, encoder); + encoder.flush(); + + return output.toByteArray(); + } + + private static GenericRecord sampleOrderRecord(){ + GenericData.Record datum = new GenericData.Record(orderSchema); + datum.put("id", 1); + datum.put("product", "paint"); + datum.put("quantity", 3); + + return datum; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java new file mode 100644 index 0000000..6947464 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.window; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.api.WindowState; +import org.apache.samza.operators.api.internal.Operators.StoreFunctions; +import org.apache.samza.operators.api.internal.Operators.WindowOperator; +import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.operators.impl.StateStoreImpl; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +import java.lang.reflect.Field; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + + +public class TestSessionWindowImpl { + Field wndStoreField = null; + Field txfmFnField = null; + + @Before public void prep() throws NoSuchFieldException { + wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore"); + txfmFnField = SessionWindowImpl.class.getDeclaredField("txfmFunction"); + wndStoreField.setAccessible(true); + txfmFnField.setAccessible(true); + } + + @Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException { + // test constructing a SessionWindowImpl w/ expected mock functions + MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class); + WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class); + StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class); + when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns); + when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store"); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class); + when(wndOp.getFunction()).thenReturn(mockTxfmFn); + SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm); + BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>> txfmFn = + (BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>>) txfmFnField.get(sessWnd); + assertEquals(mockTxfmFn, txfmFn); + StateStoreImpl<TestMessage, String, WindowState<Integer>> storeImpl = + (StateStoreImpl<TestMessage, String, WindowState<Integer>>) wndStoreField.get(sessWnd); + + // test init() and make sure the wndStore is initialized as expected + TestMessage mockMsg = mock(TestMessage.class); + TaskContext mockContext = mock(TaskContext.class); + KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class); + when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore); + Function<TestMessage, String> wndKeyFn = m -> "test-msg-key"; + when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn); + WindowState<Integer> mockState = mock(WindowState.class); + when(mockKvStore.get("test-msg-key")).thenReturn(mockState); + storeImpl.init(mockContext); + Entry<String, WindowState<Integer>> stateEntry = storeImpl.getState(mockMsg); + verify(mockStoreFns, times(1)).getStoreKeyFinder(); + verify(mockKvStore, times(1)).get("test-msg-key"); + assertEquals(stateEntry.getKey(), "test-msg-key"); + assertEquals(stateEntry.getValue(), mockState); + } + + @Test public void testInitAndProcess() { + MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class); + WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class); + StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class); + Function<TestMessage, String> wndKeyFn = m -> "test-msg-key"; + when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn); + when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns); + when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store"); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class); + when(wndOp.getFunction()).thenReturn(mockTxfmFn); + + // construct and init the SessionWindowImpl object + SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm); + TaskContext mockContext = mock(TaskContext.class); + KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class); + when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore); + sessWnd.init(mockContext); + + // test onNext() method. Make sure the right methods are invoked. + TestMessage mockMsg = mock(TestMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + BiFunction<TestMessage, WindowState<Integer>, WindowState<Integer>> stateUpdaterFn = mock(BiFunction.class); + when(mockStoreFns.getStateUpdater()).thenReturn(stateUpdaterFn); + WindowState<Integer> mockNewState = mock(WindowState.class); + WindowState<Integer> oldState = mock(WindowState.class); + when(mockKvStore.get("test-msg-key")).thenReturn(oldState); + when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState); + sessWnd.onNext(mockMsg, mockCollector, mockCoordinator); + verify(mockTxfmFn, times(1)).apply(argThat(new ArgumentMatcher<TestMessage>() { + @Override public boolean matches(Object argument) { + TestMessage xIn = (TestMessage) argument; + return xIn.equals(mockMsg); + } + }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() { + @Override public boolean matches(Object argument) { + Entry<String, WindowState<Integer>> xIn = (Entry<String, WindowState<Integer>>) argument; + return xIn.getKey().equals("test-msg-key") && xIn.getValue().equals(oldState); + } + })); + verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState); + verify(mockKvStore, times(1)).put("test-msg-key", mockNewState); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java new file mode 100644 index 0000000..91b0074 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.api.Windows; +import org.apache.samza.operators.api.TriggerBuilder; +import org.apache.samza.operators.api.data.IncomingSystemMessage; +import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collection; + + +/** + * Example implementation of split stream tasks + * + */ +public class BroadcastOperatorTask implements StreamOperatorTask { + class MessageType { + String field1; + String field2; + String field3; + String field4; + String parKey; + private long timestamp; + + public long getTimestamp() { return this.timestamp; } + } + + class JsonMessage extends InputJsonSystemMessage<MessageType> { + + JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { + super(key, data, offset, timestamp, partition); + } + } + + @Override public void initOperators(Collection<SystemMessageStream> sources) { + sources.forEach(source -> { + MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage); + + inputStream.filter(this::myFilter1). + window(Windows.<JsonMessage, String>intoSessionCounter( + m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). + setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). + addLateTriggerOnSizeLimit(10). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter2). + window(Windows.<JsonMessage, String>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)). + setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter3). + window(Windows.<JsonMessage, String, MessageType>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()). + setTriggers(TriggerBuilder + .<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000). + addTimeoutSinceFirstMessage(60000))); + } + ); + } + + JsonMessage getInputMessage(IncomingSystemMessage m1) { + return new JsonMessage( + m1.getKey().toString(), + (MessageType) m1.getMessage(), + m1.getOffset(), + this.getEventTime((GenericRecord)m1.getMessage()), + m1.getSystemStreamPartition()); + } + + long getEventTime(GenericRecord msg) { + return (Long) msg.get("event_time"); + } + + boolean myFilter1(JsonMessage m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key1"); + } + + boolean myFilter2(JsonMessage m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key2"); + } + + boolean myFilter3(JsonMessage m1) { + return m1.getMessage().parKey.equals("key3"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java new file mode 100644 index 0000000..5e710b2 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.operators.api.data.InputSystemMessage; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * Example input message w/ avro message body and string as the key. + */ + +public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> { + + private final String key; + private final T data; + private final Offset offset; + private final long timestamp; + private final SystemStreamPartition partition; + + InputJsonSystemMessage(String key, T data, Offset offset, long timestamp, SystemStreamPartition partition) { + this.key = key; + this.data = data; + this.offset = offset; + this.timestamp = timestamp; + this.partition = partition; + } + + @Override public T getMessage() { + return this.data; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } + + @Override public Offset getOffset() { return this.offset; } + + @Override public SystemStreamPartition getSystemStreamPartition() { return this.partition; } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java new file mode 100644 index 0000000..825f4c4 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.api.data.IncomingSystemMessage; +import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +/** + * Example implementation of unique key-based stream-stream join tasks + * + */ +public class JoinOperatorTask implements StreamOperatorTask { + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessage extends InputJsonSystemMessage<MessageType> { + + JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { + super(key, data, offset, timestamp, partition); + } + } + + MessageStream<JsonMessage> joinOutput = null; + + @Override public void initOperators(Collection<SystemMessageStream> sources) { + sources.forEach(source -> { + MessageStream<JsonMessage> newSource = source.map(this::getInputMessage); + if (joinOutput == null) { + joinOutput = newSource; + } else { + joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2)); + } + }); + } + + private JsonMessage getInputMessage(IncomingSystemMessage ism) { + return new JsonMessage( + ((MessageType)ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getTimestamp(), + ism.getSystemStreamPartition()); + } + + JsonMessage myJoinResult(JsonMessage m1, JsonMessage m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessage(m1.getMessage().joinKey, newJoinMsg, null, m1.getTimestamp(), null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java new file mode 100644 index 0000000..306425e --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.Partition; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + + +public class TestStreamOperatorAdaptorTask { + Field userTaskField = null; + Field chainedOpsField = null; + + @Before public void prep() throws NoSuchFieldException { + userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask"); + chainedOpsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + userTaskField.setAccessible(true); + chainedOpsField.setAccessible(true); + } + + + @Test public void testConstructor() throws IllegalAccessException { + StreamOperatorTask userTask = mock(StreamOperatorTask.class); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); + StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask); + Map<SystemStreamPartition, ChainedOperators> chainsMap = (Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask); + assertEquals(taskMemberVar, userTask); + assertTrue(chainsMap.isEmpty()); + } + + @Test public void testInit() throws Exception { + StreamOperatorTask userTask = mock(StreamOperatorTask.class); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + Set<SystemStreamPartition> testInputs = new HashSet() {{ + this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0))); + this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1))); + }}; + when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs); + adaptorTask.init(mockConfig, mockContext); + verify(userTask, times(1)).initOperators(Mockito.anyCollection()); + Map<SystemStreamPartition, ChainedOperators> chainsMap = (Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask); + assertTrue(chainsMap.size() == 2); + assertTrue(chainsMap.containsKey(testInputs.toArray()[0])); + assertTrue(chainsMap.containsKey(testInputs.toArray()[1])); + } + + // TODO: window and process methods to be added after implementation of ChainedOperators.create() +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java new file mode 100644 index 0000000..d6181ea --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit test for {@link StreamOperatorTask} + */ +public class TestStreamOperatorTasks { + + private final WindowOperatorTask userTask = new WindowOperatorTask(); + + private final BroadcastOperatorTask splitTask = new BroadcastOperatorTask(); + + private final JoinOperatorTask joinTask = new JoinOperatorTask(); + + private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() {{ + for (int i = 0; i < 4; i++) { + this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i))); + } + }}; + + @Test public void testUserTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, ChainedOperators> pipelineMap = + (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + @Test public void testSplitTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, ChainedOperators> pipelineMap = + (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + @Test public void testJoinTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, ChainedOperators> pipelineMap = + (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java new file mode 100644 index 0000000..11186ea --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.api.TriggerBuilder; +import org.apache.samza.operators.api.Windows; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collection; + + +/** + * Example implementation of a simple user-defined tasks w/ window operators + * + */ +public class WindowOperatorTask implements StreamOperatorTask { + class MessageType { + String field1; + String field2; + } + + class JsonMessage extends InputJsonSystemMessage<MessageType> { + + JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { + super(key, data, offset, timestamp, partition); + } + } + + @Override public void initOperators(Collection<SystemMessageStream> sources) { + sources.forEach(source -> + source.map(m1 -> + new JsonMessage( + this.myMessageKeyFunction(m1), + (MessageType) m1.getMessage(), + m1.getOffset(), + m1.getTimestamp(), + m1.getSystemStreamPartition())). + window( + Windows.<JsonMessage, String>intoSessionCounter( + m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). + setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). + addTimeoutSinceLastMessage(30000))) + ); + } + + String myMessageKeyFunction(Message<Object, Object> m) { + return m.getKey().toString(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java index fbb5c59..ea9ee57 100644 --- a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java +++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java @@ -38,7 +38,8 @@ public class TestAvroSchemaConverter { " ]\n" + "}"; - public static final Schema simpleRecord = new Schema.Parser().parse(SIMPLE_RECORD_SCHEMA); + public static final Schema simpleRecord = Schema.parse(SIMPLE_RECORD_SCHEMA); + @Test public void testSimpleAvroRecord(){ RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/README.md ---------------------------------------------------------------------- diff --git a/samza-sql-core/README.md b/samza-sql-core/README.md deleted file mode 100644 index 72464dc..0000000 --- a/samza-sql-core/README.md +++ /dev/null @@ -1,17 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -samza-sql is an experimental module that is under development (SAMZA-390). http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java deleted file mode 100644 index d1b8409..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import java.util.List; -import java.util.Map; - - -public interface Data { - - Schema schema(); - - Object value(); - - int intValue(); - - long longValue(); - - float floatValue(); - - double doubleValue(); - - boolean booleanValue(); - - String strValue(); - - byte[] bytesValue(); - - List<Object> arrayValue(); - - Map<Object, Object> mapValue(); - - Data getElement(int index); - - Data getFieldData(String fldName); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java deleted file mode 100644 index 80ba455..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import java.util.HashMap; -import java.util.Map; - - -/** - * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. tables and streams. - */ -public class EntityName { - /** - * {@code EntityType} defines the types of the entity names - * - */ - private enum EntityType { - TABLE, - STREAM - }; - - /** - * Type of the entity name - */ - private final EntityType type; - - /** - * Formatted name of the entity. - * - * <p>This formatted name of the entity should be unique identifier for the corresponding table/stream in the system. - * e.g. for a Kafka system stream named "mystream", the formatted name should be "kafka:mystream". - */ - private final String name; - - /** - * Static map of already allocated table names - */ - private static Map<String, EntityName> tables = new HashMap<String, EntityName>(); - - /** - * Static map of already allocated stream names - */ - private static Map<String, EntityName> streams = new HashMap<String, EntityName>(); - - /** - * Private ctor to create entity names - * - * @param type Type of the entity name - * @param name Formatted name of the entity - */ - private EntityName(EntityType type, String name) { - this.type = type; - this.name = name; - } - - @Override - public String toString() { - return String.format("%s:%s", this.type, this.name); - } - - @Override - public boolean equals(Object other) { - if (other instanceof EntityName) { - EntityName otherEntity = (EntityName) other; - return this.type.equals(otherEntity.type) && this.name.equals(otherEntity.name); - } - return false; - } - - /** - * Check to see whether this entity name is for a table - * - * @return true if the entity type is {@code EntityType.TABLE}; false otherwise - */ - public boolean isTable() { - return this.type.equals(EntityType.TABLE); - } - - /** - * Check to see whether this entity name is for a stream - * - * @return true if the entity type is {@code EntityType.STREAM}; false otherwise - */ - public boolean isStream() { - return this.type.equals(EntityType.STREAM); - } - - /** - * Get the formatted entity name - * - * @return The formatted entity name - */ - public String getName() { - return this.name; - } - - /** - * Static method to get the instance of {@code EntityName} with type {@code EntityType.TABLE} - * - * @param name The formatted entity name of the relation - * @return A <code>EntityName</code> for a relation - */ - public static EntityName getTableName(String name) { - if (tables.get(name) == null) { - tables.put(name, new EntityName(EntityType.TABLE, name)); - } - return tables.get(name); - } - - /** - * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.STREAM</code> - * - * @param name The formatted entity name of the stream - * @return A <code>EntityName</code> for a stream - */ - public static EntityName getStreamName(String name) { - if (streams.get(name) == null) { - streams.put(name, new EntityName(EntityType.STREAM, name)); - } - return streams.get(name); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java deleted file mode 100644 index 72816a3..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import org.apache.samza.storage.kv.KeyValueStore; - - -/** - * This class defines the general interface of {@code Relation}, which is defined as a map of {@link org.apache.samza.sql.api.data.Tuple}. - * - * <p>The interface is defined as an extension to {@link org.apache.samza.storage.kv.KeyValueStore}. - * - */ - -public interface Relation<K> extends KeyValueStore<K, Tuple> { - - /** - * Get the name of the relation - * - * @return The relation name - */ - EntityName getName(); -}
