Repository: samza Updated Branches: refs/heads/master 202a15809 -> 1e5f30f38
http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java new file mode 100644 index 0000000..0d6141e --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.window; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.WindowOperator; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.operators.impl.StateStoreImpl; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Default implementation class of a {@link WindowOperator} for a session window. + * + * @param <M> the type of input {@link Message} + * @param <RK> the type of window key + * @param <WS> the type of window state + * @param <RM> the type of aggregated value of the window + */ +public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends + OperatorImpl<M, RM> { + private final WindowOperator<M, RK, WS, RM> sessWnd; + private StateStoreImpl<M, RK, WS> wndStore = null; + + public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) { + this.sessWnd = sessWnd; + } + + @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + Entry<RK, WS> state = this.wndStore.getState(message); + this.nextProcessors(this.sessWnd.getFunction().apply(message, state), collector, coordinator); + this.wndStore.updateState(message, state); + } + + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + // This is to periodically check the timeout triggers to get the list of window states to be updated + } + + @Override protected void init(MessageStream<M> source, TaskContext context) { + this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), sessWnd.getStoreName(source)); + this.wndStore.init(context); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java new file mode 100644 index 0000000..c2f780d --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreams; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.data.IncomingSystemMessage; +import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.operators.task.StreamOperatorTask; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.HashMap; +import java.util.Map; + + +/** + * An adaptor task class that invoke the user-implemented (@link StreamOperatorTask}s via {@link org.apache.samza.operators.MessageStream} programming APIs + * + */ +public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask { + /** + * A map with entries mapping {@link SystemStreamPartition} to {@link org.apache.samza.operators.impl.ChainedOperators} that takes the {@link SystemStreamPartition} + * as the input stream + */ + private final Map<SystemStreamPartition, ChainedOperators> operatorChains = new HashMap<>(); + + /** + * Wrapped {@link StreamOperatorTask} class + */ + private final StreamOperatorTask userTask; + + /** + * Constructor that wraps the user-defined {@link StreamOperatorTask} + * + * @param userTask the user-defined {@link StreamOperatorTask} + */ + public StreamOperatorAdaptorTask(StreamOperatorTask userTask) { + this.userTask = userTask; + } + + @Override + public final void init(Config config, TaskContext context) throws Exception { + if (this.userTask instanceof InitableTask) { + ((InitableTask) this.userTask).init(config, context); + } + Map<SystemStreamPartition, SystemMessageStream> sources = new HashMap<>(); + context.getSystemStreamPartitions().forEach(ssp -> { + SystemMessageStream ds = MessageStreams.input(ssp); + sources.put(ssp, ds); + }); + this.userTask.initOperators(sources.values()); + sources.forEach((ssp, ds) -> operatorChains.put(ssp, ChainedOperators.create(ds, context))); + } + + @Override + public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { + this.operatorChains.get(ime.getSystemStreamPartition()).onNext(new IncomingSystemMessage(ime), collector, coordinator); + } + + @Override + public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + this.operatorChains.forEach((ssp, chain) -> chain.onTimer(collector, coordinator)); + if (this.userTask instanceof WindowableTask) { + ((WindowableTask) this.userTask).window(collector, coordinator); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java new file mode 100644 index 0000000..e3a70e8 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.Windows; +import org.apache.samza.task.TaskContext; +import org.junit.Before; +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + + +public class TestChainedOperators { + Field subsField = null; + Field opSubsField = null; + + @Before public void prep() throws NoSuchFieldException { + subsField = ChainedOperators.class.getDeclaredField("subscribers"); + subsField.setAccessible(true); + opSubsField = OperatorImpl.class.getDeclaredField("subscribers"); + opSubsField.setAccessible(true); + } + + @Test public void testCreate() { + // test creation of empty chain + MessageStream<TestMessage> testStream = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testStream, mockContext); + assertTrue(operatorChain != null); + } + + @Test public void testLinearChain() throws IllegalAccessException { + // test creation of linear chain + MessageStream<TestMessage> testInput = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessage::getKey)); + ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext); + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessage, TestMessage> firstOpImpl = subsSet.iterator().next(); + Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(firstOpImpl); + assertEquals(subsOps.size(), 1); + Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = subsOps.iterator().next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(wndOpImpl); + assertEquals(subsOps.size(), 0); + } + + @Test public void testBroadcastChain() throws IllegalAccessException { + // test creation of broadcast chain + MessageStream<TestMessage> testInput = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); + testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m); + ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext); + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain); + assertEquals(subsSet.size(), 2); + Iterator<OperatorImpl> iter = subsSet.iterator(); + // check the first branch w/ flatMap + OperatorImpl<TestMessage, TestMessage> opImpl = iter.next(); + Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl); + assertEquals(subsOps.size(), 1); + Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = subsOps.iterator().next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(flatMapImpl); + assertEquals(subsOps.size(), 0); + // check the second branch w/ map + opImpl = iter.next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl); + assertEquals(subsOps.size(), 1); + Subscriber<? super ProcessorContext<TestMessage>> mapImpl = subsOps.iterator().next(); + subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(mapImpl); + assertEquals(subsOps.size(), 0); + } + + @Test public void testJoinChain() throws IllegalAccessException { + // test creation of join chain + MessageStream<TestMessage> input1 = new MessageStream<>(); + MessageStream<TestMessage> input2 = new MessageStream<>(); + TaskContext mockContext = mock(TaskContext.class); + input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m -> m); + // now, we create chained operators from each input sources + ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, mockContext); + ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, mockContext); + // check that those two chains will merge at map operator + // first branch of the join + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = subsSet.iterator().next(); + Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp1); + assertEquals(subsOps.size(), 1); + // the map operator consumes the common join output, where two branches merge + Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = subsOps.iterator().next(); + // second branch of the join + subsSet = (Set<OperatorImpl>) subsField.get(chain2); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = subsSet.iterator().next(); + assertNotSame(joinOp1, joinOp2); + subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp2); + assertEquals(subsOps.size(), 1); + // make sure that the map operator is the same + assertEquals(mapImpl, subsOps.iterator().next()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java new file mode 100644 index 0000000..cb4576c --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.impl.join.PartialJoinOpImpl; +import org.apache.samza.operators.impl.window.SessionWindowImpl; +import org.apache.samza.operators.internal.Operators.PartialJoinOperator; +import org.apache.samza.operators.internal.Operators.SinkOperator; +import org.apache.samza.operators.internal.Operators.StreamOperator; +import org.apache.samza.operators.internal.Operators.WindowOperator; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestOperatorFactory { + + @Test public void testGetOperator() throws NoSuchFieldException, IllegalAccessException { + // get window operator + WindowOperator mockWnd = mock(WindowOperator.class); + Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean> + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockWnd); + assertFalse(factoryEntry.getValue()); + OperatorImpl<TestMessage, TestOutputMessage> opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof SessionWindowImpl); + Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd"); + sessWndField.setAccessible(true); + WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl); + assertEquals(sessWnd, mockWnd); + + // get simple operator + StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = mock(StreamOperator.class); + Function<TestMessage, Collection<TestOutputMessage>> mockTxfmFn = mock(Function.class); + when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn); + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockSimpleOp); + opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof SimpleOperatorImpl); + Field txfmFnField = SimpleOperatorImpl.class.getDeclaredField("transformFn"); + txfmFnField.setAccessible(true); + assertEquals(mockTxfmFn, txfmFnField.get(opImpl)); + + // get sink operator + MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, mc, tc) -> { }; + SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class); + when(sinkOp.getFunction()).thenReturn(sinkFn); + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(sinkOp); + opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof SinkOperatorImpl); + Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc"); + sinkFnField.setAccessible(true); + assertEquals(sinkFn, sinkFnField.get(opImpl)); + + // get join operator + PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> joinOp = mock(PartialJoinOperator.class); + TestOutputMessage mockOutput = mock(TestOutputMessage.class); + BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) -> mockOutput; + when(joinOp.getFunction()).thenReturn(joinFn); + factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(joinOp); + opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey(); + assertTrue(opImpl instanceof PartialJoinOpImpl); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java new file mode 100644 index 0000000..4bd467d --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.reactivestreams.Subscriber; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + + +public class TestOperatorImpl { + + TestMessage curInputMsg; + MessageCollector curCollector; + TaskCoordinator curCoordinator; + + @Test public void testSubscribers() { + this.curInputMsg = null; + this.curCollector = null; + this.curCoordinator = null; + OperatorImpl<TestMessage, TestOutputMessage> opImpl = new OperatorImpl<TestMessage, TestOutputMessage>() { + @Override protected void onNext(TestMessage message, MessageCollector collector, TaskCoordinator coordinator) { + TestOperatorImpl.this.curInputMsg = message; + TestOperatorImpl.this.curCollector = collector; + TestOperatorImpl.this.curCoordinator = coordinator; + } + }; + // verify subscribe() added the mockSub and nextProcessors() invoked the mockSub.onNext() + Subscriber<ProcessorContext<TestOutputMessage>> mockSub = mock(Subscriber.class); + opImpl.subscribe(mockSub); + TestOutputMessage xOutput = mock(TestOutputMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.nextProcessors(xOutput, mockCollector, mockCoordinator); + verify(mockSub, times(1)).onNext(argThat(new ArgumentMatcher<ProcessorContext<TestOutputMessage>>() { + @Override public boolean matches(Object argument) { + ProcessorContext<TestOutputMessage> pCntx = (ProcessorContext<TestOutputMessage>) argument; + return pCntx.getMessage().equals(xOutput) && pCntx.getCoordinator().equals(mockCoordinator) && pCntx.getCollector().equals(mockCollector); + } + })); + // verify onNext() is invoked correctly + TestMessage mockInput = mock(TestMessage.class); + ProcessorContext<TestMessage> inCntx = new ProcessorContext<>(mockInput, mockCollector, mockCoordinator); + opImpl.onNext(inCntx); + assertEquals(mockInput, this.curInputMsg); + assertEquals(mockCollector, this.curCollector); + assertEquals(mockCoordinator, this.curCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java new file mode 100644 index 0000000..224245e --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.TestMessage; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + + +public class TestProcessorContext { + @Test public void testConstructor() { + TestMessage mockMsg = mock(TestMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class); + ProcessorContext<TestMessage> pCntx = new ProcessorContext<>(mockMsg, mockCollector, mockTaskCoordinator); + assertEquals(pCntx.getMessage(), mockMsg); + assertEquals(pCntx.getCollector(), mockCollector); + assertEquals(pCntx.getCoordinator(), mockTaskCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java new file mode 100644 index 0000000..de029ea --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.internal.Operators.StreamOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Function; + +import static org.mockito.Mockito.*; + + +public class TestSimpleOperatorImpl { + + @Test public void testSimpleOperator() { + StreamOperator<TestMessage, TestOutputMessage> mockOp = mock(StreamOperator.class); + Function<TestMessage, Collection<TestOutputMessage>> txfmFn = mock(Function.class); + when(mockOp.getFunction()).thenReturn(txfmFn); + + SimpleOperatorImpl<TestMessage, TestOutputMessage> opImpl = spy(new SimpleOperatorImpl<>(mockOp)); + TestMessage inMsg = mock(TestMessage.class); + TestOutputMessage outMsg = mock(TestOutputMessage.class); + Collection<TestOutputMessage> mockOutputs = new ArrayList() { { + this.add(outMsg); + } }; + when(txfmFn.apply(inMsg)).thenReturn(mockOutputs); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onNext(inMsg, mockCollector, mockCoordinator); + verify(txfmFn, times(1)).apply(inMsg); + verify(opImpl, times(1)).nextProcessors(outMsg, mockCollector, mockCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java new file mode 100644 index 0000000..cdac3fc --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.internal.Operators.SinkOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestSinkOperatorImpl { + + @Test public void testSinkOperator() { + SinkOperator<TestOutputMessage> sinkOp = mock(SinkOperator.class); + MessageStream.VoidFunction3<TestOutputMessage, MessageCollector, TaskCoordinator> sinkFn = mock( + MessageStream.VoidFunction3.class); + when(sinkOp.getFunction()).thenReturn(sinkFn); + SinkOperatorImpl<TestOutputMessage> sinkImpl = new SinkOperatorImpl<>(sinkOp); + TestOutputMessage mockMsg = mock(TestOutputMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + + sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator); + verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java new file mode 100644 index 0000000..5ede757 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.internal.Operators.StoreFunctions; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + + +public class TestStateStoreImpl { + @Test public void testStateStoreImpl() { + StoreFunctions<TestMessage, String, WindowState> mockStoreFunctions = mock(StoreFunctions.class); + // test constructor + StateStoreImpl<TestMessage, String, WindowState> storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName"); + TaskContext mockContext = mock(TaskContext.class); + KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class); + when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore); + // test init() + storeImpl.init(mockContext); + verify(mockContext, times(1)).getStore("myStoreName"); + Function<TestMessage, String> wndKeyFn = mock(Function.class); + when(mockStoreFunctions.getStoreKeyFinder()).thenReturn(wndKeyFn); + TestMessage mockMsg = mock(TestMessage.class); + when(wndKeyFn.apply(mockMsg)).thenReturn("myKey"); + WindowState mockState = mock(WindowState.class); + when(mockKvStore.get("myKey")).thenReturn(mockState); + // test getState() + Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg); + assertEquals(storeEntry.getKey(), "myKey"); + assertEquals(storeEntry.getValue(), mockState); + verify(wndKeyFn, times(1)).apply(mockMsg); + verify(mockKvStore, times(1)).get("myKey"); + Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState); + WindowState mockNewState = mock(WindowState.class); + BiFunction<TestMessage, WindowState, WindowState> mockUpdaterFn = mock(BiFunction.class); + when(mockStoreFunctions.getStateUpdater()).thenReturn(mockUpdaterFn); + when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState); + // test updateState() + Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, oldEntry); + assertEquals(newEntry.getKey(), "myKey"); + assertEquals(newEntry.getValue(), mockNewState); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java new file mode 100644 index 0000000..719ab99 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.window; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.internal.Operators.StoreFunctions; +import org.apache.samza.operators.internal.Operators.WindowOperator; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +import java.lang.reflect.Field; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + + +public class TestSessionWindowImpl { + Field wndStoreField = null; + Field sessWndField = null; + + @Before public void prep() throws NoSuchFieldException { + wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore"); + sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd"); + wndStoreField.setAccessible(true); + sessWndField.setAccessible(true); + } + + @Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException { + // test constructing a SessionWindowImpl w/ expected mock functions + WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class); + SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp); + assertEquals(wndOp, sessWndField.get(sessWnd)); + } + + @Test public void testInitAndProcess() throws IllegalAccessException { + WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class); + BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class); + SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp); + + // construct and init the SessionWindowImpl object + MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class); + StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class); + Function<TestMessage, String> wndKeyFn = m -> "test-msg-key"; + when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn); + when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns); + when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store"); + when(wndOp.getFunction()).thenReturn(mockTxfmFn); + TaskContext mockContext = mock(TaskContext.class); + KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class); + when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore); + sessWnd.init(mockInputStrm, mockContext); + + // test onNext() method. Make sure the transformation function and the state update functions are invoked. + TestMessage mockMsg = mock(TestMessage.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + BiFunction<TestMessage, WindowState<Integer>, WindowState<Integer>> stateUpdaterFn = mock(BiFunction.class); + when(mockStoreFns.getStateUpdater()).thenReturn(stateUpdaterFn); + WindowState<Integer> mockNewState = mock(WindowState.class); + WindowState<Integer> oldState = mock(WindowState.class); + when(mockKvStore.get("test-msg-key")).thenReturn(oldState); + when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState); + sessWnd.onNext(mockMsg, mockCollector, mockCoordinator); + verify(mockTxfmFn, times(1)).apply(argThat(new ArgumentMatcher<TestMessage>() { + @Override public boolean matches(Object argument) { + TestMessage xIn = (TestMessage) argument; + return xIn.equals(mockMsg); + } + }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() { + @Override public boolean matches(Object argument) { + Entry<String, WindowState<Integer>> xIn = (Entry<String, WindowState<Integer>>) argument; + return xIn.getKey().equals("test-msg-key") && xIn.getValue().equals(oldState); + } + })); + verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState); + verify(mockKvStore, times(1)).put("test-msg-key", mockNewState); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java new file mode 100644 index 0000000..d1b0a88 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.TriggerBuilder; +import org.apache.samza.operators.Windows; +import org.apache.samza.operators.data.IncomingSystemMessage; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.task.StreamOperatorTask; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collection; + + +/** + * Example implementation of split stream tasks + * + */ +public class BroadcastOperatorTask implements StreamOperatorTask { + class MessageType { + String field1; + String field2; + String field3; + String field4; + String parKey; + private long timestamp; + + public long getTimestamp() { + return this.timestamp; + } + } + + class JsonMessage extends InputJsonSystemMessage<MessageType> { + + JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { + super(key, data, offset, timestamp, partition); + } + } + + @Override public void initOperators(Collection<SystemMessageStream> sources) { + sources.forEach(source -> { + MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage); + + inputStream.filter(this::myFilter1). + window(Windows.<JsonMessage, String>intoSessionCounter( + m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). + setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). + addLateTriggerOnSizeLimit(10). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter2). + window(Windows.<JsonMessage, String>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)). + setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100). + addTimeoutSinceLastMessage(30000))); + + inputStream.filter(this::myFilter3). + window(Windows.<JsonMessage, String, MessageType>intoSessions( + m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()). + setTriggers(TriggerBuilder.<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000). + addTimeoutSinceFirstMessage(60000))); + } + ); + } + + JsonMessage getInputMessage(IncomingSystemMessage m1) { + return (JsonMessage) m1.getMessage(); + } + + boolean myFilter1(JsonMessage m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key1"); + } + + boolean myFilter2(JsonMessage m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key2"); + } + + boolean myFilter3(JsonMessage m1) { + return m1.getMessage().parKey.equals("key3"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java new file mode 100644 index 0000000..88aa159 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.operators.data.InputSystemMessage; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * Example input message w/ Json message body and string as the key. + */ + +public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> { + + private final String key; + private final T data; + private final Offset offset; + private final long timestamp; + private final SystemStreamPartition partition; + + InputJsonSystemMessage(String key, T data, Offset offset, long timestamp, SystemStreamPartition partition) { + this.key = key; + this.data = data; + this.offset = offset; + this.timestamp = timestamp; + this.partition = partition; + } + + @Override public T getMessage() { + return this.data; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } + + @Override public Offset getOffset() { + return this.offset; + } + + @Override public SystemStreamPartition getSystemStreamPartition() { + return this.partition; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java new file mode 100644 index 0000000..f6b3ff8 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.data.IncomingSystemMessage; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.task.StreamOperatorTask; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +/** + * Example implementation of unique key-based stream-stream join tasks + * + */ +public class JoinOperatorTask implements StreamOperatorTask { + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessage extends InputJsonSystemMessage<MessageType> { + + JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { + super(key, data, offset, timestamp, partition); + } + } + + MessageStream<JsonMessage> joinOutput = null; + + @Override public void initOperators(Collection<SystemMessageStream> sources) { + sources.forEach(source -> { + MessageStream<JsonMessage> newSource = source.map(this::getInputMessage); + if (joinOutput == null) { + joinOutput = newSource; + } else { + joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2)); + } + }); + } + + private JsonMessage getInputMessage(IncomingSystemMessage ism) { + return new JsonMessage( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getTimestamp(), + ism.getSystemStreamPartition()); + } + + JsonMessage myJoinResult(JsonMessage m1, JsonMessage m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessage(m1.getMessage().joinKey, newJoinMsg, null, m1.getTimestamp(), null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java new file mode 100644 index 0000000..47d6b3a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.operators.task.StreamOperatorTask; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.Partition; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + + +public class TestStreamOperatorAdaptorTask { + Field userTaskField = null; + Field chainedOpsField = null; + + @Before public void prep() throws NoSuchFieldException { + userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask"); + chainedOpsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + userTaskField.setAccessible(true); + chainedOpsField.setAccessible(true); + } + + + @Test public void testConstructor() throws IllegalAccessException { + StreamOperatorTask userTask = mock(StreamOperatorTask.class); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); + StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask); + Map<SystemStreamPartition, ChainedOperators> chainsMap = (Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask); + assertEquals(taskMemberVar, userTask); + assertTrue(chainsMap.isEmpty()); + } + + @Test public void testInit() throws Exception { + StreamOperatorTask userTask = mock(StreamOperatorTask.class); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + Set<SystemStreamPartition> testInputs = new HashSet() { { + this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0))); + this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1))); + } }; + when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs); + adaptorTask.init(mockConfig, mockContext); + verify(userTask, times(1)).initOperators(Mockito.anyCollection()); + Map<SystemStreamPartition, ChainedOperators> chainsMap = (Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask); + assertTrue(chainsMap.size() == 2); + assertTrue(chainsMap.containsKey(testInputs.toArray()[0])); + assertTrue(chainsMap.containsKey(testInputs.toArray()[1])); + } + + // TODO: window and process methods to be added after implementation of ChainedOperators.create() +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java new file mode 100644 index 0000000..44efa6d --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask} + */ +public class TestStreamOperatorTasks { + + private final WindowOperatorTask userTask = new WindowOperatorTask(); + + private final BroadcastOperatorTask splitTask = new BroadcastOperatorTask(); + + private final JoinOperatorTask joinTask = new JoinOperatorTask(); + + private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { + for (int i = 0; i < 4; i++) { + this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i))); + } + } }; + + @Test public void testUserTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, ChainedOperators> pipelineMap = + (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + @Test public void testSplitTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, ChainedOperators> pipelineMap = + (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } + + @Test public void testJoinTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask); + Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); + pipelineMapFld.setAccessible(true); + Map<SystemStreamPartition, ChainedOperators> pipelineMap = + (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + assertEquals(pipelineMap.size(), 4); + this.inputPartitions.forEach(partition -> { + assertNotNull(pipelineMap.get(partition)); + }); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java new file mode 100644 index 0000000..de7bba5 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.TriggerBuilder; +import org.apache.samza.operators.Windows; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.task.StreamOperatorTask; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collection; + + +/** + * Example implementation of a simple user-defined tasks w/ window operators + * + */ +public class WindowOperatorTask implements StreamOperatorTask { + class MessageType { + String field1; + String field2; + } + + class JsonMessage extends InputJsonSystemMessage<MessageType> { + + JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { + super(key, data, offset, timestamp, partition); + } + } + + @Override public void initOperators(Collection<SystemMessageStream> sources) { + sources.forEach(source -> + source.map(m1 -> + new JsonMessage( + this.myMessageKeyFunction(m1), + (MessageType) m1.getMessage(), + m1.getOffset(), + m1.getTimestamp(), + m1.getSystemStreamPartition())). + window( + Windows.<JsonMessage, String>intoSessionCounter( + m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). + setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). + addTimeoutSinceLastMessage(30000))) + ); + } + + String myMessageKeyFunction(Message<Object, Object> m) { + return m.getKey().toString(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 6ea62b4..813882c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,8 +20,9 @@ include \ 'samza-api', 'samza-elasticsearch', 'samza-log4j', - 'samza-shell', - 'samza-rest' + 'samza-operator', + 'samza-rest', + 'samza-shell' def scalaModules = [ 'samza-core',
