http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java new file mode 100644 index 0000000..02637a3 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.TestMessageEnvelope; +import org.apache.samza.operators.TestMessageStreamImplUtil; +import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.task.TaskContext; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestOperatorImpls { + Field nextOperatorsField = null; + Method createOpMethod = null; + Method createOpsMethod = null; + + @Before + public void prep() throws NoSuchFieldException, NoSuchMethodException { + nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators"); + nextOperatorsField.setAccessible(true); + + createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, + OperatorSpec.class, Config.class, TaskContext.class); + createOpMethod.setAccessible(true); + + createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); + createOpsMethod.setAccessible(true); + } + + @Test + public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException { + // get window operator + WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); + WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null); + when(mockWnd.getWindow()).thenReturn(windowInternal); + MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + + OperatorGraph opGraph = new OperatorGraph(); + OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) + createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext); + assertTrue(opImpl instanceof WindowOperatorImpl); + Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window"); + wndInternalField.setAccessible(true); + WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl); + assertEquals(wndInternal, windowInternal); + + // get simple operator + StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class); + FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class); + when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn); + opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); + assertTrue(opImpl instanceof StreamOperatorImpl); + Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn"); + txfmFnField.setAccessible(true); + assertEquals(mockTxfmFn, txfmFnField.get(opImpl)); + + // get sink operator + SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { }; + SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); + when(sinkOp.getSinkFn()).thenReturn(sinkFn); + opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); + assertTrue(opImpl instanceof SinkOperatorImpl); + Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn"); + sinkFnField.setAccessible(true); + assertEquals(sinkFn, sinkFnField.get(opImpl)); + + // get join operator + PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class); + TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class); + PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class); + when(joinOp.getTransformFn()).thenReturn(joinFn); + opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); + assertTrue(opImpl instanceof PartialJoinOperatorImpl); + } + + @Test + public void testEmptyChain() throws InvocationTargetException, IllegalAccessException { + // test creation of empty chain + MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class); + TaskContext mockContext = mock(TaskContext.class); + Config mockConfig = mock(Config.class); + OperatorGraph opGraph = new OperatorGraph(); + RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext); + assertTrue(operatorChain != null); + } + + @Test + public void testLinearChain() throws IllegalAccessException, InvocationTargetException { + // test creation of linear chain + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + TaskContext mockContext = mock(TaskContext.class); + Config mockConfig = mock(Config.class); + testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10))); + OperatorGraph opGraph = new OperatorGraph(); + RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next(); + Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl); + assertEquals(subsOps.size(), 1); + OperatorImpl wndOpImpl = subsOps.iterator().next(); + subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl); + assertEquals(subsOps.size(), 0); + } + + @Test + public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException { + // test creation of broadcast chain + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + TaskContext mockContext = mock(TaskContext.class); + Config mockConfig = mock(Config.class); + testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); + testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m); + OperatorGraph opGraph = new OperatorGraph(); + RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); + assertEquals(subsSet.size(), 2); + Iterator<OperatorImpl> iter = subsSet.iterator(); + // check the first branch w/ flatMap + OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next(); + Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl); + assertEquals(subsOps.size(), 1); + OperatorImpl flatMapImpl = subsOps.iterator().next(); + subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl); + assertEquals(subsOps.size(), 0); + // check the second branch w/ map + opImpl = iter.next(); + subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl); + assertEquals(subsOps.size(), 1); + OperatorImpl mapImpl = subsOps.iterator().next(); + subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl); + assertEquals(subsOps.size(), 0); + } + + @Test + public void testJoinChain() throws IllegalAccessException, InvocationTargetException { + // test creation of join chain + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); + MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); + TaskContext mockContext = mock(TaskContext.class); + Config mockConfig = mock(Config.class); + input1 + .join(input2, + new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() { + @Override + public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) { + return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + } + + @Override + public String getFirstKey(TestMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(TestMessageEnvelope message) { + return message.getKey(); + } + }) + .map(m -> m); + OperatorGraph opGraph = new OperatorGraph(); + // now, we create chained operators from each input sources + RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext); + RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext); + // check that those two chains will merge at map operator + // first branch of the join + Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next(); + Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1); + assertEquals(subsOps.size(), 1); + // the map operator consumes the common join output, where two branches merge + OperatorImpl mapImpl = subsOps.iterator().next(); + // second branch of the join + subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2); + assertEquals(subsSet.size(), 1); + OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next(); + assertNotSame(joinOp1, joinOp2); + subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2); + assertEquals(subsOps.size(), 1); + // make sure that the map operator is the same + assertEquals(mapImpl, subsOps.iterator().next()); + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java new file mode 100644 index 0000000..ce9fdd2 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestSinkOperatorImpl { + + @Test + public void testSinkOperator() { + SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); + SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class); + when(sinkOp.getSinkFn()).thenReturn(sinkFn); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext); + TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + + sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator); + verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java new file mode 100644 index 0000000..010a210 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.TestMessageEnvelope; +import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestStreamOperatorImpl { + + @Test + public void testSimpleOperator() { + StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); + FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); + when(mockOp.getTransformFn()).thenReturn(txfmFn); + MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class); + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext)); + TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class); + TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class); + Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { { + this.add(outMsg); + } }; + when(txfmFn.apply(inMsg)).thenReturn(mockOutputs); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onNext(inMsg, mockCollector, mockCoordinator); + verify(txfmFn, times(1)).apply(inMsg); + verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java new file mode 100644 index 0000000..31257a4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.TestMessageEnvelope; +import org.apache.samza.operators.TestMessageStreamImplUtil; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + + +public class TestOperatorSpecs { + @Test + public void testGetStreamOperator() { + FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { + this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L)); + } }; + MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class); + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput); + assertEquals(strmOp.getTransformFn(), transformFn); + assertEquals(strmOp.getNextStream(), mockOutput); + } + + @Test + public void testGetSinkOperator() { + SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector, + TaskCoordinator taskCoordinator) -> { }; + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph); + assertEquals(sinkOp.getSinkFn(), sinkFn); + assertTrue(sinkOp.getNextStream() == null); + } + + @Test + public void testGetWindowOperator() throws Exception { + Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey"; + BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1; + + //instantiate a window using reflection + WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null); + + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class); + WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut); + assertEquals(spec.getWindow(), window); + assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); + assertEquals(spec.getWindow().getFoldFunction(), aggregator); + } + + @Test + public void testGetPartialJoinOperator() { + PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger = + new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() { + @Override + public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) { + return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime()); + } + + @Override + public Object getKey(MessageEnvelope<Object, ?> message) { + return message.getKey(); + } + + @Override + public Object getOtherKey(MessageEnvelope<Object, ?> message) { + return message.getKey(); + } + }; + + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin = + OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput); + + assertEquals(partialJoin.getNextStream(), joinOutput); + MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class); + MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class); + assertEquals(partialJoin.getTransformFn(), merger); + } + + @Test + public void testGetMergeOperator() { + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output); + Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { { + this.add(t); + } }; + TestMessageEnvelope t = mock(TestMessageEnvelope.class); + assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t)); + assertEquals(mergeOp.getNextStream(), output); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/README.md ---------------------------------------------------------------------- diff --git a/samza-operator/README.md b/samza-operator/README.md deleted file mode 100644 index 15d2092..0000000 --- a/samza-operator/README.md +++ /dev/null @@ -1,17 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -samza-operator is an experimental module that is under development (SAMZA-552). http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java deleted file mode 100644 index 830e4a5..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators; - -import java.util.Collection; -import java.util.Collections; -import java.util.function.Function; -import java.util.HashSet; -import java.util.Set; -import org.apache.samza.config.Config; -import org.apache.samza.operators.functions.FilterFunction; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.OperatorSpecs; -import org.apache.samza.operators.windows.Window; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.task.TaskContext; - - -/** - * The implementation for input/output {@link MessageStream}s to/from the operators. - * Users use the {@link MessageStream} API methods to describe and chain the operators specs. - * - * @param <M> type of messages in this {@link MessageStream} - */ -public class MessageStreamImpl<M> implements MessageStream<M> { - /** - * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl} - */ - private final StreamGraphImpl graph; - - /** - * The set of operators that consume the messages in this {@link MessageStream} - */ - private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>(); - - /** - * Default constructor - * - * @param graph the {@link StreamGraphImpl} object that this stream belongs to - */ - MessageStreamImpl(StreamGraphImpl graph) { - this.graph = graph; - } - - @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) { - OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph)); - this.registeredOperatorSpecs.add(op); - return op.getNextStream(); - } - - @Override public MessageStream<M> filter(FilterFunction<M> filterFn) { - OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph)); - this.registeredOperatorSpecs.add(op); - return op.getNextStream(); - } - - @Override - public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) { - OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph)); - this.registeredOperatorSpecs.add(op); - return op.getNextStream(); - } - - @Override - public void sink(SinkFunction<M> sinkFn) { - this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph)); - } - - @Override public void sendTo(OutputStream<M> stream) { - this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream)); - } - - @Override - public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { - OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, - this.graph, new MessageStreamImpl<>(this.graph)); - this.registeredOperatorSpecs.add(wndOp); - return wndOp.getNextStream(); - } - - @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn) { - MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph); - - PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, OM, RM>() { - @Override - public RM apply(M m1, OM om) { - return joinFn.apply(m1, om); - } - - @Override - public K getKey(M message) { - return joinFn.getFirstKey(message); - } - - @Override - public K getOtherKey(OM message) { - return joinFn.getSecondKey(message); - } - - @Override - public void init(Config config, TaskContext context) { - joinFn.init(config, context); - } - }; - - PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, OM, M, RM>() { - @Override - public RM apply(OM m1, M m) { - return joinFn.apply(m, m1); - } - - @Override - public K getKey(OM message) { - return joinFn.getSecondKey(message); - } - - @Override - public K getOtherKey(M message) { - return joinFn.getFirstKey(message); - } - }; - - // TODO: need to add default store functions for the two partial join functions - - ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add( - OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, this.graph, outputStream)); - this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream)); - return outputStream; - } - - @Override - public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) { - MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph); - - otherStreams.add(this); - otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs. - add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream))); - return outputStream; - } - - @Override - public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) { - MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor); - OutputStream<M> outputStream = this.graph.getOutputStream(intStream); - this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(), - this.graph, outputStream)); - return intStream; - } - /** - * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and - * should not be exposed to users. - * - * @return a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}. - */ - public Collection<OperatorSpec> getRegisteredOperatorSpecs() { - return Collections.unmodifiableSet(this.registeredOperatorSpecs); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java deleted file mode 100644 index dca3469..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import java.util.Properties; -import java.util.function.Function; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.serializers.Serde; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to - * create system input/output/intermediate streams. - */ -public class StreamGraphImpl implements StreamGraph { - - /** - * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope} - * in the input {@link MessageStream}s. - */ - private int opId = 0; - - private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> { - final StreamSpec spec; - final Serde<K> keySerde; - final Serde<V> msgSerde; - - InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - super(graph); - this.spec = streamSpec; - this.keySerde = keySerde; - this.msgSerde = msgSerde; - } - - StreamSpec getSpec() { - return this.spec; - } - - } - - private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> { - final StreamSpec spec; - final Serde<K> keySerde; - final Serde<V> msgSerde; - - OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - this.spec = streamSpec; - this.keySerde = keySerde; - this.msgSerde = msgSerde; - } - - StreamSpec getSpec() { - return this.spec; - } - - @Override - public SinkFunction<M> getSinkFunction() { - return (M message, MessageCollector mc, TaskCoordinator tc) -> { - // TODO: need to find a way to directly pass in the serde class names - // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), - // message.getKey(), message.getKey(), message.getMessage())); - mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); - }; - } - } - - private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> { - final Function<M, PK> parKeyFn; - - /** - * Default constructor - * - * @param graph the {@link StreamGraphImpl} object that this stream belongs to - */ - IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - this(graph, streamSpec, keySerde, msgSerde, null); - } - - IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) { - super(graph, streamSpec, keySerde, msgSerde); - this.parKeyFn = parKeyFn; - } - - @Override - public SinkFunction<M> getSinkFunction() { - return (M message, MessageCollector mc, TaskCoordinator tc) -> { - // TODO: need to find a way to directly pass in the serde class names - // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), - // message.getKey(), message.getKey(), message.getMessage())); - if (this.parKeyFn == null) { - mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); - } else { - // apply partition key function - mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage())); - } - }; - } - } - - /** - * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl} - */ - private final Map<SystemStream, MessageStream> inStreams = new HashMap<>(); - private final Map<SystemStream, OutputStream> outStreams = new HashMap<>(); - - private ContextManager contextManager = new ContextManager() { }; - - @Override - public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { - this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); - } - return this.inStreams.get(streamSpec.getSystemStream()); - } - - /** - * Helper method to be used by {@link MessageStreamImpl} class - * - * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as the output - * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream} - * @return the {@link MessageStreamImpl} object - */ - @Override - public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { - this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); - } - return this.outStreams.get(streamSpec.getSystemStream()); - } - - /** - * Helper method to be used by {@link MessageStreamImpl} class - * - * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream} - * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream} - * @return the {@link MessageStreamImpl} object - */ - @Override - public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { - this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde)); - } - IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream()); - if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { - this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); - } - return intStream; - } - - @Override public Map<StreamSpec, MessageStream> getInStreams() { - Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>(); - this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry)); - return Collections.unmodifiableMap(inStreamMap); - } - - @Override public Map<StreamSpec, OutputStream> getOutStreams() { - Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>(); - this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry)); - return Collections.unmodifiableMap(outStreamMap); - } - - @Override - public StreamGraph withContextManager(ContextManager manager) { - this.contextManager = manager; - return this; - } - - public int getNextOpId() { - return this.opId++; - } - - public ContextManager getContextManager() { - return this.contextManager; - } - - /** - * Helper method to be get the input stream via {@link SystemStream} - * - * @param systemStream the {@link SystemStream} - * @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream} - */ - public MessageStreamImpl getInputStream(SystemStream systemStream) { - if (this.inStreams.containsKey(systemStream)) { - return (MessageStreamImpl) this.inStreams.get(systemStream); - } - return null; - } - - <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) { - if (this.outStreams.containsValue(intStream)) { - return (OutputStream<M>) intStream; - } - return null; - } - - <M> MessageStream<M> getIntStream(OutputStream<M> outStream) { - if (this.inStreams.containsValue(outStream)) { - return (MessageStream<M>) outStream; - } - return null; - } - - /** - * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method. - * - * @param parKeyFn the function to extract the partition key from the input message - * @param <PK> the type of partition key - * @param <M> the type of input message - * @return the {@link OutputStream} object for the re-partitioned stream - */ - <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) { - // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec} - StreamSpec streamSpec = new StreamSpec() { - @Override - public SystemStream getSystemStream() { - // TODO: should auto-generate intermedaite stream name here - return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId)); - } - - @Override - public Properties getProperties() { - return null; - } - }; - - if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { - this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); - } - IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream()); - if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { - this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); - } - return intStream; - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java deleted file mode 100644 index 809a70a..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.functions; - -import org.apache.samza.annotation.InterfaceStability; - - -/** - * This defines the interface function a two-way join functions that takes input messages from two input - * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output - */ [email protected] -public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction { - - /** - * Method to perform join method on the two input messages - * - * @param m1 message from the first input stream - * @param om message from the second input stream - * @return the joined message in the output stream - */ - RM apply(M m1, OM om); - - /** - * Method to get the key from the input message - * - * @param message the input message from the first strean - * @return the join key in the {@code message} - */ - K getKey(M message); - - /** - * Method to get the key from the input message in the other stream - * - * @param message the input message from the other stream - * @return the join key in the {@code message} - */ - K getOtherKey(OM message); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java deleted file mode 100644 index 66336f8..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.TaskContext; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - - -/** - * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a - * {@link MessageStreamImpl} - */ -public class OperatorGraph { - - /** - * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG - * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created - * according to a single instance of {@link OperatorSpec}. - */ - private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>(); - - /** - * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages. - */ - private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>(); - - /** - * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}. - * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and - * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input. - * - * @param inputStreams the map of input {@link org.apache.samza.operators.MessageStream}s - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators - */ - public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) { - inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context))); - } - - /** - * Method to get the corresponding {@link RootOperatorImpl} - * - * @param ss input {@link SystemStream} - * @param <M> the type of input message - * @return the {@link OperatorImpl} that starts processing the input message - */ - public <M> OperatorImpl<M, M> get(SystemStream ss) { - return this.operatorGraph.get(ss); - } - - /** - * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl}, - * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. - * - * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for - * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl} - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators - * @return root node for the {@link OperatorImpl} DAG - */ - private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, - TaskContext context) { - // since the source message stream might have multiple operator specs registered on it, - // create a new root node as a single point of entry for the DAG. - RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>(); - // create the pipeline/topology starting from the source - source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { - // pass in the source and context s.t. stateful stream operators can initialize their stores - OperatorImpl<M, ?> operatorImpl = - this.createAndRegisterOperatorImpl(registeredOperator, source, config, context); - rootOperator.registerNextOperator(operatorImpl); - }); - return rootOperator; - } - - /** - * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding - * {@link OperatorImpl}s. - * - * @param operatorSpec the operatorSpec registered with the {@code source} - * @param source the source {@link MessageStreamImpl} - * @param <M> type of input message - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators - * @return the operator implementation for the operatorSpec - */ - private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, - MessageStreamImpl<M> source, Config config, TaskContext context) { - if (!operators.containsKey(operatorSpec)) { - OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context); - if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) { - // this is the first time we've added the operatorImpl corresponding to the operatorSpec, - // so traverse and initialize and register the rest of the DAG. - // initialize the corresponding operator function - operatorSpec.init(config, context); - MessageStreamImpl nextStream = operatorSpec.getNextStream(); - if (nextStream != null) { - Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs(); - registeredSpecs.forEach(registeredSpec -> { - OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context); - operatorImpl.registerNextOperator(subImpl); - }); - } - return operatorImpl; - } - } - - // the implementation corresponding to operatorSpec has already been instantiated - // and registered, so we do not need to traverse the DAG further. - return operators.get(operatorSpec); - } - - /** - * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. - * - * @param source the source {@link MessageStreamImpl} - * @param <M> type of input message - * @param operatorSpec the immutable {@link OperatorSpec} definition. - * @param config the {@link Config} required to instantiate operators - * @param context the {@link TaskContext} required to instantiate operators - * @return the {@link OperatorImpl} implementation instance - */ - private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) { - if (operatorSpec instanceof StreamOperatorSpec) { - StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec; - return new StreamOperatorImpl<>(streamOpSpec, source, config, context); - } else if (operatorSpec instanceof SinkOperatorSpec) { - return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context); - } else if (operatorSpec instanceof WindowOperatorSpec) { - return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context); - } else if (operatorSpec instanceof PartialJoinOperatorSpec) { - return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context); - } - throw new IllegalArgumentException( - String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java deleted file mode 100644 index abb1fa9..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - -import java.util.HashSet; -import java.util.Set; - - -/** - * Abstract base class for all stream operator implementations. - */ -public abstract class OperatorImpl<M, RM> { - - private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>(); - - /** - * Register the next operator in the chain that this operator should propagate its output to. - * @param nextOperator the next operator in the chain. - */ - void registerNextOperator(OperatorImpl<RM, ?> nextOperator) { - nextOperators.add(nextOperator); - } - - /** - * Perform the transformation required for this operator and call the downstream operators. - * - * Must call {@link #propagateResult} to propage the output to registered downstream operators correctly. - * - * @param message the input message - * @param collector the {@link MessageCollector} in the context - * @param coordinator the {@link TaskCoordinator} in the context - */ - public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); - - /** - * Helper method to propagate the output of this operator to all registered downstream operators. - * - * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly. - * - * @param outputMessage output message - * @param collector the {@link MessageCollector} in the context - * @param coordinator the {@link TaskCoordinator} in the context - */ - void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java deleted file mode 100644 index c8515e1..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Implementation of a {@link PartialJoinOperatorSpec}. This class implements function - * that only takes in one input stream among all inputs to the join and generate the join output. - * - * @param <M> type of messages in the input stream - * @param <JM> type of messages in the stream to join with - * @param <RM> type of messages in the joined stream - */ -class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> { - - PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, MessageStreamImpl<M> source, Config config, TaskContext context) { - // TODO: implement PartialJoinOperatorImpl constructor - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - // TODO: implement PartialJoinOperatorImpl processing logic - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java deleted file mode 100644 index 4b30a5d..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -/** - * A no-op operator implementation that forwards incoming messages to all of its subscribers. - * @param <M> type of incoming messages - */ -final class RootOperatorImpl<M> extends OperatorImpl<M, M> { - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.propagateResult(message, collector, coordinator); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java deleted file mode 100644 index 2bb362c..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Default implementation class of a {@link WindowOperatorSpec} for a session window. - * - * @param <M> the type of input message - * @param <RK> the type of window key - * @param <WV> the type of window state - */ -class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> { - - private final WindowOperatorSpec<M, RK, WV> windowSpec; - - SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { - this.windowSpec = windowSpec; - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - } - - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - // This is to periodically check the timeout triggers to get the list of window states to be updated - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java deleted file mode 100644 index 41d1778..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Implementation for {@link SinkOperatorSpec} - */ -class SinkOperatorImpl<M> extends OperatorImpl<M, M> { - - private final SinkFunction<M> sinkFn; - - SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) { - this.sinkFn = sinkOp.getSinkFn(); - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.sinkFn.apply(message, collector, coordinator); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java deleted file mode 100644 index 644de20..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message. - * - * @param <M> type of message in the input stream - * @param <RM> type of message in the output stream - */ -class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> { - - private final FlatMapFunction<M, RM> transformFn; - - StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { - this.transformFn = streamOperatorSpec.getTransformFn(); - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - // call the transform function and then for each output call propagateResult() - this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java deleted file mode 100644 index af00553..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - -public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> { - - private final WindowInternal<M, WK, WV> window; - - public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) { - // source, config, and context are used to initialize the window kv-store - window = spec.getWindow(); - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java deleted file mode 100644 index 1444662..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.spec; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.task.TaskContext; - - -/** - * A stateless serializable stream operator specification that holds all the information required - * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}. - * - * @param <OM> the type of output message from the operator - */ [email protected] -public interface OperatorSpec<OM> { - - enum OpCode { - MAP, - FLAT_MAP, - FILTER, - SINK, - SEND_TO, - JOIN, - WINDOW, - MERGE, - PARTITION_BY - } - - - /** - * Get the output stream containing transformed messages produced by this operator. - * @return the output stream containing transformed messages produced by this operator. - */ - MessageStreamImpl<OM> getNextStream(); - - /** - * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } -} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java deleted file mode 100644 index d626852..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.spec; - -import java.util.Collection; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.functions.FilterFunction; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.internal.WindowInternal; - -import java.util.ArrayList; -import org.apache.samza.task.TaskContext; - - -/** - * Factory methods for creating {@link OperatorSpec} instances. - */ -public class OperatorSpecs { - - private OperatorSpecs() {} - - /** - * Creates a {@link StreamOperatorSpec} for {@link MapFunction} - * - * @param mapFn the map function - * @param graph the {@link StreamGraphImpl} object - * @param output the output {@link MessageStreamImpl} object - * @param <M> type of input message - * @param <OM> type of output message - * @return the {@link StreamOperatorSpec} - */ - public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) { - return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() { - @Override - public Collection<OM> apply(M message) { - return new ArrayList<OM>() { - { - OM r = mapFn.apply(message); - if (r != null) { - this.add(r); - } - } - }; - } - - @Override - public void init(Config config, TaskContext context) { - mapFn.init(config, context); - } - }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId()); - } - - /** - * Creates a {@link StreamOperatorSpec} for {@link FilterFunction} - * - * @param filterFn the transformation function - * @param graph the {@link StreamGraphImpl} object - * @param output the output {@link MessageStreamImpl} object - * @param <M> type of input message - * @return the {@link StreamOperatorSpec} - */ - public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) { - return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() { - @Override - public Collection<M> apply(M message) { - return new ArrayList<M>() { - { - if (filterFn.apply(message)) { - this.add(message); - } - } - }; - } - - @Override - public void init(Config config, TaskContext context) { - filterFn.init(config, context); - } - }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId()); - } - - /** - * Creates a {@link StreamOperatorSpec}. - * - * @param transformFn the transformation function - * @param graph the {@link StreamGraphImpl} object - * @param output the output {@link MessageStreamImpl} object - * @param <M> type of input message - * @param <OM> type of output message - * @return the {@link StreamOperatorSpec} - */ - public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec( - FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) { - return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId()); - } - - /** - * Creates a {@link SinkOperatorSpec}. - * - * @param sinkFn the sink function - * @param <M> type of input message - * @param graph the {@link StreamGraphImpl} object - * @return the {@link SinkOperatorSpec} - */ - public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) { - return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId()); - } - - /** - * Creates a {@link SinkOperatorSpec}. - * - * @param sinkFn the sink function - * @param graph the {@link StreamGraphImpl} object - * @param stream the {@link OutputStream} where the message is sent to - * @param <M> type of input message - * @return the {@link SinkOperatorSpec} - */ - public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { - return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream); - } - - /** - * Creates a {@link SinkOperatorSpec}. - * - * @param sinkFn the sink function - * @param graph the {@link StreamGraphImpl} object - * @param stream the {@link OutputStream} where the message is sent to - * @param <M> type of input message - * @return the {@link SinkOperatorSpec} - */ - public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { - return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream); - } - - /** - * Creates a {@link WindowOperatorSpec}. - * - * @param window the description of the window. - * @param graph the {@link StreamGraphImpl} object - * @param wndOutput the window output {@link MessageStreamImpl} object - * @param <M> the type of input message - * @param <WK> the type of key in the {@link WindowPane} - * @param <WV> the type of value in the window - * @return the {@link WindowOperatorSpec} - */ - - public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec( - WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) { - return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId()); - } - - /** - * Creates a {@link PartialJoinOperatorSpec}. - * - * @param partialJoinFn the join function - * @param graph the {@link StreamGraphImpl} object - * @param joinOutput the output {@link MessageStreamImpl} - * @param <M> type of input message - * @param <K> type of join key - * @param <JM> the type of message in the other join stream - * @param <OM> the type of message in the join output - * @return the {@link PartialJoinOperatorSpec} - */ - public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec( - PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) { - return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId()); - } - - /** - * Creates a {@link StreamOperatorSpec} with a merger function. - * - * @param graph the {@link StreamGraphImpl} object - * @param mergeOutput the output {@link MessageStreamImpl} from the merger - * @param <M> the type of input message - * @return the {@link StreamOperatorSpec} for the merge - */ - public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) { - return new StreamOperatorSpec<M, M>(message -> - new ArrayList<M>() { - { - this.add(message); - } - }, - mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId()); - } -}
