http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java new file mode 100644 index 0000000..cc70b6b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.control; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; +import org.apache.samza.message.EndOfStreamMessage; +import org.apache.samza.operators.spec.OperatorSpecs; +import org.apache.samza.operators.spec.OutputOperatorSpec; +import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestEndOfStreamManager { + StreamMetadataCache metadataCache; + + @Before + public void setup() { + SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); + partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); + metadataCache = mock(StreamMetadataCache.class); + when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); + } + + @Test + public void testUpdateFromInputSource() { + SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + TaskName taskName = new TaskName("Task 0"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); + EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null); + manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp), mock(TaskCoordinator.class)); + assertTrue(manager.isEndOfStream(ssp.getSystemStream())); + } + + @Test + public void testUpdateFromIntermediateStream() { + SystemStreamPartition[] ssps = new SystemStreamPartition[3]; + ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0)); + ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0)); + ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1)); + + TaskName taskName = new TaskName("Task 0"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + for (SystemStreamPartition ssp : ssps) { + streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); + } + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); + EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null); + + int envelopeCount = 4; + IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount]; + for (int i = 0; i < envelopeCount; i++) { + envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new EndOfStreamMessage("task " + i, envelopeCount)); + } + TaskCoordinator coordinator = mock(TaskCoordinator.class); + + // verify the first three messages won't result in end-of-stream + for (int i = 0; i < 3; i++) { + manager.update(envelopes[i], coordinator); + assertFalse(manager.isEndOfStream(ssps[0].getSystemStream())); + } + // the fourth message will end the stream + manager.update(envelopes[3], coordinator); + assertTrue(manager.isEndOfStream(ssps[0].getSystemStream())); + assertFalse(manager.isEndOfStream(ssps[1].getSystemStream())); + + // stream2 has two partitions assigned to this task, so it requires a message from each partition to end it + envelopes = new IncomingMessageEnvelope[envelopeCount]; + for (int i = 0; i < envelopeCount; i++) { + envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount)); + } + // verify the messages for the partition 0 won't result in end-of-stream + for (int i = 0; i < 4; i++) { + manager.update(envelopes[i], coordinator); + assertFalse(manager.isEndOfStream(ssps[1].getSystemStream())); + } + for (int i = 0; i < envelopeCount; i++) { + envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount)); + } + for (int i = 0; i < 3; i++) { + manager.update(envelopes[i], coordinator); + assertFalse(manager.isEndOfStream(ssps[1].getSystemStream())); + } + // the fourth message will end the stream + manager.update(envelopes[3], coordinator); + assertTrue(manager.isEndOfStream(ssps[1].getSystemStream())); + } + + @Test + public void testUpdateFromIntermediateStreamWith2Tasks() { + SystemStreamPartition[] ssps0 = new SystemStreamPartition[2]; + ssps0[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0)); + ssps0[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0)); + + SystemStreamPartition ssp1 = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1)); + + TaskName t0 = new TaskName("Task 0"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + for (SystemStreamPartition ssp : ssps0) { + streamToTasks.put(ssp.getSystemStream(), t0.getTaskName()); + } + + TaskName t1 = new TaskName("Task 1"); + streamToTasks.put(ssp1, t1.getTaskName()); + + List<StreamSpec> inputs = new ArrayList<>(); + inputs.add(new StreamSpec("test-stream-1", "test-stream-1", "test-system")); + inputs.add(new StreamSpec("test-stream-2", "test-stream-2", "test-system")); + StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); + IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); + + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(ioGraph); + + EndOfStreamManager manager0 = spy(new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null)); + manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[0]), mock(TaskCoordinator.class)); + assertTrue(manager0.isEndOfStream(ssps0[0].getSystemStream())); + doNothing().when(manager0).sendEndOfStream(any(), anyInt()); + manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[1]), mock(TaskCoordinator.class)); + assertTrue(manager0.isEndOfStream(ssps0[1].getSystemStream())); + verify(manager0).sendEndOfStream(any(), anyInt()); + + EndOfStreamManager manager1 = spy(new EndOfStreamManager("Task 1", listener, streamToTasks, Collections.singleton( + ssp1), null, null)); + doNothing().when(manager1).sendEndOfStream(any(), anyInt()); + manager1.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), mock(TaskCoordinator.class)); + assertTrue(manager1.isEndOfStream(ssp1.getSystemStream())); + verify(manager1).sendEndOfStream(any(), anyInt()); + } + + @Test + public void testSendEndOfStream() { + StreamSpec ints = new StreamSpec("int-stream", "int-stream", "test-system"); + StreamSpec input = new StreamSpec("input-stream", "input-stream", "test-system"); + IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(Collections.singletonList(input), ints, true); + + Multimap<SystemStream, String> inputToTasks = HashMultimap.create(); + for (int i = 0; i < 8; i++) { + inputToTasks.put(input.toSystemStream(), "Task " + i); + } + + MessageCollector collector = mock(MessageCollector.class); + TaskName taskName = new TaskName("Task 0"); + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(ioGraph); + EndOfStreamManager manager = new EndOfStreamManager(taskName.getTaskName(), + listener, + inputToTasks, + Collections.EMPTY_SET, + metadataCache, + collector); + + Set<Integer> partitions = new HashSet<>(); + doAnswer(invocation -> { + OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0]; + partitions.add((Integer) envelope.getPartitionKey()); + EndOfStreamMessage eosMessage = (EndOfStreamMessage) envelope.getMessage(); + assertEquals(eosMessage.getTaskName(), taskName.getTaskName()); + assertEquals(eosMessage.getTaskCount(), 8); + return null; + }).when(collector).send(any()); + + manager.sendEndOfStream(input.toSystemStream(), 8); + assertEquals(partitions.size(), 4); + } + + @Test + public void testPropagate() { + List<StreamSpec> inputs = new ArrayList<>(); + inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system")); + inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system")); + StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); + + SystemStream input1 = new SystemStream("test-system", "input-stream-1"); + SystemStream input2 = new SystemStream("test-system", "input-stream-2"); + SystemStream ints = new SystemStream("test-system", "int-stream"); + SystemStreamPartition[] ssps = new SystemStreamPartition[3]; + ssps[0] = new SystemStreamPartition(input1, new Partition(0)); + ssps[1] = new SystemStreamPartition(input2, new Partition(0)); + ssps[2] = new SystemStreamPartition(ints, new Partition(0)); + + Set<SystemStreamPartition> sspSet = new HashSet<>(Arrays.asList(ssps)); + TaskName taskName = new TaskName("task 0"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + for (SystemStreamPartition ssp : ssps) { + streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); + } + + IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); + MessageCollector collector = mock(MessageCollector.class); + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(ioGraph); + EndOfStreamManager manager = spy( + new EndOfStreamManager("task 0", listener, streamToTasks, sspSet, metadataCache, collector)); + TaskCoordinator coordinator = mock(TaskCoordinator.class); + + // ssp1 end-of-stream, wait for ssp2 + manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[0]), coordinator); + verify(manager, never()).sendEndOfStream(any(), anyInt()); + + // ssp2 end-of-stream, propagate to intermediate + manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[1]), coordinator); + doNothing().when(manager).sendEndOfStream(any(), anyInt()); + ArgumentCaptor<SystemStream> argument = ArgumentCaptor.forClass(SystemStream.class); + verify(manager).sendEndOfStream(argument.capture(), anyInt()); + assertEquals(ints, argument.getValue()); + + // intermediate end-of-stream, shutdown the task + manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[2]), coordinator); + doNothing().when(coordinator).shutdown(any()); + ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class); + verify(coordinator).shutdown(arg.capture()); + assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue()); + } + + // Test the case when the publishing tasks to intermediate stream is a subset of total tasks + @Test + public void testPropogateWith2Tasks() { + StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); + OutputStreamImpl outputStream = new OutputStreamImpl(outputSpec, null, null); + OutputOperatorSpec partitionByOp = OperatorSpecs.createPartitionByOperatorSpec(outputStream, 0); + + List<StreamSpec> inputs = new ArrayList<>(); + inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system")); + + IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); + + SystemStream input1 = new SystemStream("test-system", "input-stream-1"); + SystemStream ints = new SystemStream("test-system", "int-stream"); + SystemStreamPartition ssp1 = new SystemStreamPartition(input1, new Partition(0)); + SystemStreamPartition ssp2 = new SystemStreamPartition(ints, new Partition(0)); + + TaskName t0 = new TaskName("task 0"); + TaskName t1 = new TaskName("task 1"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + streamToTasks.put(ssp1.getSystemStream(), t0.getTaskName()); + streamToTasks.put(ssp2.getSystemStream(), t1.getTaskName()); + + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(ioGraph); + + EndOfStreamManager manager0 = spy( + new EndOfStreamManager(t0.getTaskName(), listener, streamToTasks, Collections.singleton(ssp1), metadataCache, null)); + EndOfStreamManager manager1 = spy( + new EndOfStreamManager(t1.getTaskName(), listener, streamToTasks, Collections.singleton(ssp2), metadataCache, null)); + + TaskCoordinator coordinator0 = mock(TaskCoordinator.class); + TaskCoordinator coordinator1 = mock(TaskCoordinator.class); + + // ssp1 end-of-stream + doNothing().when(manager0).sendEndOfStream(any(), anyInt()); + doNothing().when(coordinator0).shutdown(any()); + manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), coordinator0); + //verify task count is 1 + ArgumentCaptor<Integer> argument = ArgumentCaptor.forClass(Integer.class); + verify(manager0).sendEndOfStream(any(), argument.capture()); + assertTrue(argument.getValue() == 1); + ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class); + verify(coordinator0).shutdown(arg.capture()); + assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue()); + + // int1 end-of-stream + IncomingMessageEnvelope intEos = new IncomingMessageEnvelope(ssp2, null, null, new EndOfStreamMessage(t0.getTaskName(), 1)); + manager1.update(intEos, coordinator1); + doNothing().when(coordinator1).shutdown(any()); + verify(manager1, never()).sendEndOfStream(any(), anyInt()); + arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class); + verify(coordinator1).shutdown(arg.capture()); + assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue()); + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java new file mode 100644 index 0000000..39c56c3 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.control; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.control.IOGraph.IONode; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.StreamSpec; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestIOGraph { + StreamSpec input1; + StreamSpec input2; + StreamSpec input3; + StreamSpec output1; + StreamSpec output2; + StreamSpec int1; + StreamSpec int2; + + StreamGraphImpl streamGraph; + + @Before + public void setup() { + ApplicationRunner runner = mock(ApplicationRunner.class); + Map<String, String> configMap = new HashMap<>(); + configMap.put(JobConfig.JOB_NAME(), "test-app"); + configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); + Config config = new MapConfig(configMap); + + /** + * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. + * + * input1 -> map -> join -> output1 + * | + * input2 -> partitionBy -> filter -| + * | + * input3 -> filter -> partitionBy -> map -> join -> output2 + * + */ + input1 = new StreamSpec("input1", "input1", "system1"); + input2 = new StreamSpec("input2", "input2", "system2"); + input3 = new StreamSpec("input3", "input3", "system2"); + + output1 = new StreamSpec("output1", "output1", "system1"); + output2 = new StreamSpec("output2", "output2", "system2"); + + runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("input1")).thenReturn(input1); + when(runner.getStreamSpec("input2")).thenReturn(input2); + when(runner.getStreamSpec("input3")).thenReturn(input3); + when(runner.getStreamSpec("output1")).thenReturn(output1); + when(runner.getStreamSpec("output2")).thenReturn(output2); + + // intermediate streams used in tests + int1 = new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"); + int2 = new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"); + when(runner.getStreamSpec("test-app-1-partition_by-3")) + .thenReturn(int1); + when(runner.getStreamSpec("test-app-1-partition_by-8")) + .thenReturn(int2); + + streamGraph = new StreamGraphImpl(runner, config); + BiFunction msgBuilder = mock(BiFunction.class); + MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); + MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true); + MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + Function mockFn = mock(Function.class); + OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn); + OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn); + + m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(om1); + m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2); + } + + @Test + public void testBuildIOGraph() { + IOGraph ioGraph = streamGraph.toIOGraph(); + assertEquals(ioGraph.getNodes().size(), 4); + + for (IONode node : ioGraph.getNodes()) { + if (node.getOutput().equals(output1)) { + assertEquals(node.getInputs().size(), 2); + assertFalse(node.isOutputIntermediate()); + StreamSpec[] inputs = sort(node.getInputs()); + assertEquals(inputs[0], input1); + assertEquals(inputs[1], int1); + } else if (node.getOutput().equals(output2)) { + assertEquals(node.getInputs().size(), 2); + assertFalse(node.isOutputIntermediate()); + StreamSpec[] inputs = sort(node.getInputs()); + assertEquals(inputs[0], int1); + assertEquals(inputs[1], int2); + } else if (node.getOutput().equals(int1)) { + assertEquals(node.getInputs().size(), 1); + assertTrue(node.isOutputIntermediate()); + StreamSpec[] inputs = sort(node.getInputs()); + assertEquals(inputs[0], input2); + } else if (node.getOutput().equals(int2)) { + assertEquals(node.getInputs().size(), 1); + assertTrue(node.isOutputIntermediate()); + StreamSpec[] inputs = sort(node.getInputs()); + assertEquals(inputs[0], input3); + } + } + } + + @Test + public void testNodesOfInput() { + IOGraph ioGraph = streamGraph.toIOGraph(); + Collection<IONode> nodes = ioGraph.getNodesOfInput(input1.toSystemStream()); + assertEquals(nodes.size(), 1); + IONode node = nodes.iterator().next(); + assertEquals(node.getOutput(), output1); + assertEquals(node.getInputs().size(), 2); + assertFalse(node.isOutputIntermediate()); + + nodes = ioGraph.getNodesOfInput(input2.toSystemStream()); + assertEquals(nodes.size(), 1); + node = nodes.iterator().next(); + assertEquals(node.getOutput(), int1); + assertEquals(node.getInputs().size(), 1); + assertTrue(node.isOutputIntermediate()); + + nodes = ioGraph.getNodesOfInput(int1.toSystemStream()); + assertEquals(nodes.size(), 2); + nodes.forEach(n -> { + assertEquals(n.getInputs().size(), 2); + }); + + nodes = ioGraph.getNodesOfInput(input3.toSystemStream()); + assertEquals(nodes.size(), 1); + node = nodes.iterator().next(); + assertEquals(node.getOutput(), int2); + assertEquals(node.getInputs().size(), 1); + assertTrue(node.isOutputIntermediate()); + + nodes = ioGraph.getNodesOfInput(int2.toSystemStream()); + assertEquals(nodes.size(), 1); + node = nodes.iterator().next(); + assertEquals(node.getOutput(), output2); + assertEquals(node.getInputs().size(), 2); + assertFalse(node.isOutputIntermediate()); + } + + private static StreamSpec[] sort(Set<StreamSpec> specs) { + StreamSpec[] array = new StreamSpec[specs.size()]; + specs.toArray(array); + Arrays.sort(array, (s1, s2) -> s1.getId().compareTo(s2.getId())); + return array; + } + + public static IOGraph buildSimpleIOGraph(List<StreamSpec> inputs, + StreamSpec output, + boolean isOutputIntermediate) { + IONode node = new IONode(output, isOutputIntermediate); + inputs.forEach(input -> node.addInput(input)); + return new IOGraph(Collections.singleton(node)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java new file mode 100644 index 0000000..8fe7a16 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.control; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; +import org.apache.samza.message.WatermarkMessage; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestWatermarkManager { + + StreamMetadataCache metadataCache; + + @Before + public void setup() { + SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); + partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); + metadataCache = mock(StreamMetadataCache.class); + when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); + } + + @Test + public void testUpdateFromInputSource() { + SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + TaskName taskName = new TaskName("Task 0"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); + WatermarkManager manager = new WatermarkManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null); + long time = System.currentTimeMillis(); + Watermark watermark = manager.update(WatermarkManager.buildWatermarkEnvelope(time, ssp)); + assertEquals(watermark.getTimestamp(), time); + } + + @Test + public void testUpdateFromIntermediateStream() { + SystemStreamPartition[] ssps = new SystemStreamPartition[3]; + ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0)); + ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0)); + ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1)); + + TaskName taskName = new TaskName("Task 0"); + Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); + for (SystemStreamPartition ssp : ssps) { + streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); + } + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); + + WatermarkManager manager = new WatermarkManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null); + int envelopeCount = 4; + IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount]; + + long[] time = {300L, 200L, 100L, 400L}; + for (int i = 0; i < envelopeCount; i++) { + envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new WatermarkMessage(time[i], "task " + i, envelopeCount)); + } + for (int i = 0; i < 3; i++) { + assertNull(manager.update(envelopes[i])); + } + // verify the first three messages won't result in end-of-stream + assertEquals(manager.getWatermarkTime(ssps[0]), WatermarkManager.TIME_NOT_EXIST); + // the fourth message will generate a watermark + Watermark watermark = manager.update(envelopes[3]); + assertNotNull(watermark); + assertEquals(watermark.getTimestamp(), 100); + assertEquals(manager.getWatermarkTime(ssps[1]), WatermarkManager.TIME_NOT_EXIST); + assertEquals(manager.getWatermarkTime(ssps[2]), WatermarkManager.TIME_NOT_EXIST); + + + // stream2 has two partitions assigned to this task, so it requires a message from each partition to calculate watermarks + long[] time1 = {300L, 200L, 100L, 400L}; + envelopes = new IncomingMessageEnvelope[envelopeCount]; + for (int i = 0; i < envelopeCount; i++) { + envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "", new WatermarkMessage(time1[i], "task " + i, envelopeCount)); + } + // verify the messages for the partition 0 won't generate watermark + for (int i = 0; i < 4; i++) { + assertNull(manager.update(envelopes[i])); + } + assertEquals(manager.getWatermarkTime(ssps[1]), 100L); + + long[] time2 = {350L, 150L, 500L, 80L}; + for (int i = 0; i < envelopeCount; i++) { + envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "", new WatermarkMessage(time2[i], "task " + i, envelopeCount)); + } + for (int i = 0; i < 3; i++) { + assertNull(manager.update(envelopes[i])); + } + assertEquals(manager.getWatermarkTime(ssps[2]), WatermarkManager.TIME_NOT_EXIST); + // the fourth message will generate the watermark + watermark = manager.update(envelopes[3]); + assertNotNull(watermark); + assertEquals(manager.getWatermarkTime(ssps[2]), 80L); + assertEquals(watermark.getTimestamp(), 80L); + } + + @Test + public void testSendWatermark() { + SystemStream ints = new SystemStream("test-system", "int-stream"); + SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); + partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); + StreamMetadataCache metadataCache = mock(StreamMetadataCache.class); + when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); + + MessageCollector collector = mock(MessageCollector.class); + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); + + WatermarkManager manager = new WatermarkManager("task 0", + listener, + HashMultimap.create(), + Collections.EMPTY_SET, + metadataCache, + collector); + + long time = System.currentTimeMillis(); + Set<Integer> partitions = new HashSet<>(); + doAnswer(invocation -> { + OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0]; + partitions.add((Integer) envelope.getPartitionKey()); + WatermarkMessage watermarkMessage = (WatermarkMessage) envelope.getMessage(); + assertEquals(watermarkMessage.getTaskName(), "task 0"); + assertEquals(watermarkMessage.getTaskCount(), 8); + assertEquals(watermarkMessage.getTimestamp(), time); + return null; + }).when(collector).send(any()); + + manager.sendWatermark(time, ints, 8); + assertEquals(partitions.size(), 4); + } + + @Test + public void testPropagate() { + StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); + List<StreamSpec> inputs = new ArrayList<>(); + inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system")); + inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system")); + + IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); + + SystemStream input1 = new SystemStream("test-system", "input-stream-1"); + SystemStream input2 = new SystemStream("test-system", "input-stream-2"); + SystemStream input3 = new SystemStream("test-system", "input-stream-3"); + SystemStream ints = new SystemStream("test-system", "int-stream"); + SystemStreamPartition[] ssps0 = new SystemStreamPartition[3]; + ssps0[0] = new SystemStreamPartition(input1, new Partition(0)); + ssps0[1] = new SystemStreamPartition(input2, new Partition(0)); + ssps0[2] = new SystemStreamPartition(ints, new Partition(0)); + + SystemStreamPartition[] ssps1 = new SystemStreamPartition[4]; + ssps1[0] = new SystemStreamPartition(input1, new Partition(1)); + ssps1[1] = new SystemStreamPartition(input2, new Partition(1)); + ssps1[2] = new SystemStreamPartition(input3, new Partition(1)); + ssps1[3] = new SystemStreamPartition(ints, new Partition(1)); + + SystemStreamPartition[] ssps2 = new SystemStreamPartition[2]; + ssps2[0] = new SystemStreamPartition(input3, new Partition(2)); + ssps2[1] = new SystemStreamPartition(ints, new Partition(2)); + + + TaskName t0 = new TaskName("task 0"); //consume input1 and input2 + TaskName t1 = new TaskName("task 1"); //consume input 1 and input2 and input 3 + TaskName t2 = new TaskName("task 2"); //consume input2 and input 3 + Multimap<SystemStream, String> inputToTasks = HashMultimap.create(); + for (SystemStreamPartition ssp : ssps0) { + inputToTasks.put(ssp.getSystemStream(), t0.getTaskName()); + } + for (SystemStreamPartition ssp : ssps1) { + inputToTasks.put(ssp.getSystemStream(), t1.getTaskName()); + } + for (SystemStreamPartition ssp : ssps2) { + inputToTasks.put(ssp.getSystemStream(), t2.getTaskName()); + } + + ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); + when(listener.getIOGraph()).thenReturn(ioGraph); + WatermarkManager manager = spy( + new WatermarkManager(t0.getTaskName(), listener, inputToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null)); + + IncomingMessageEnvelope envelope = WatermarkManager.buildWatermarkEnvelope(System.currentTimeMillis(), ssps0[0]); + doNothing().when(manager).sendWatermark(anyLong(), any(), anyInt()); + Watermark watermark = manager.update(envelope); + assertNotNull(watermark); + long time = System.currentTimeMillis(); + Watermark updatedWatermark = watermark.copyWithTimestamp(time); + updatedWatermark.propagate(ints); + ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor<SystemStream> arg2 = ArgumentCaptor.forClass(SystemStream.class); + ArgumentCaptor<Integer> arg3 = ArgumentCaptor.forClass(Integer.class); + verify(manager).sendWatermark(arg1.capture(), arg2.capture(), arg3.capture()); + assertEquals(arg1.getValue().longValue(), time); + assertEquals(arg2.getValue(), ints); + assertEquals(arg3.getValue().intValue(), 2); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index d50d271..3ae8f5b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.impl; +import java.util.Set; import org.apache.samza.config.Config; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; @@ -210,5 +211,22 @@ public class TestOperatorImpl { super(OpCode.INPUT, 1); } } + + public static Set<OperatorImpl> getNextOperators(OperatorImpl op) { + return op.registeredOperators; + } + + public static OperatorSpec.OpCode getOpCode(OperatorImpl op) { + return op.getOperatorSpec().getOpCode(); + } + + public static long getInputWatermark(OperatorImpl op) { + return op.getInputWatermarkTime(); + } + + public static long getOutputWatermark(OperatorImpl op) { + return op.getOutputWatermarkTime(); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 6a8d765..fc1259c 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -88,9 +88,7 @@ public class TestStreamProcessor { } @Override - SamzaContainer createSamzaContainer( - ContainerModel containerModel, - int maxChangelogStreamPartitions) { + SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { if (container == null) { RunLoop mockRunLoop = mock(RunLoop.class); doAnswer(invocation -> http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index a04bd3b..4be4e73 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -19,6 +19,7 @@ package org.apache.samza.runtime; +import java.util.Set; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; @@ -330,7 +331,6 @@ public class TestLocalApplicationRunner { return null; }).when(sp).start(); - LocalApplicationRunner spy = spy(runner); doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture()); @@ -343,4 +343,8 @@ public class TestLocalApplicationRunner { assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish); } + public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) { + return runner.getProcessors(); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java index 5b76bba..7192525 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java @@ -26,7 +26,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import org.apache.samza.message.EndOfStreamMessage; import org.apache.samza.message.WatermarkMessage; -import org.apache.samza.message.MessageType; +import org.apache.samza.message.IntermediateMessageType; import org.apache.samza.serializers.IntermediateMessageSerde; import org.apache.samza.serializers.Serde; import org.junit.Test; @@ -96,7 +96,7 @@ public class TestIntermediateMessageSerde { TestUserMessage userMessage = new TestUserMessage(msg, 0, System.currentTimeMillis()); byte[] bytes = imserde.toBytes(userMessage); TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes); - assertEquals(MessageType.of(de), MessageType.USER_MESSAGE); + assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.USER_MESSAGE); assertEquals(de.getMessage(), msg); assertEquals(de.getOffset(), 0); assertTrue(de.getTimestamp() > 0); @@ -109,7 +109,7 @@ public class TestIntermediateMessageSerde { WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), taskName, 8); byte[] bytes = imserde.toBytes(watermark); WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes); - assertEquals(MessageType.of(de), MessageType.WATERMARK); + assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.WATERMARK_MESSAGE); assertEquals(de.getTaskName(), taskName); assertEquals(de.getTaskCount(), 8); assertTrue(de.getTimestamp() > 0); @@ -123,7 +123,7 @@ public class TestIntermediateMessageSerde { EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8); byte[] bytes = imserde.toBytes(eos); EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes); - assertEquals(MessageType.of(de), MessageType.END_OF_STREAM); + assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.END_OF_STREAM_MESSAGE); assertEquals(de.getTaskName(), taskName); assertEquals(de.getTaskCount(), 8); assertEquals(de.getVersion(), 1); http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 1afc26a..03931f1 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -39,6 +39,7 @@ import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskInstanceExceptionHandler; import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; +import org.apache.samza.control.EndOfStreamManager; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; @@ -78,15 +79,15 @@ public class TestAsyncRunLoop { private final IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0"); private final IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1"); private final IncomingMessageEnvelope envelope3 = new IncomingMessageEnvelope(ssp0, "1", "key0", "value0"); - private final IncomingMessageEnvelope ssp0EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0); - private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1); + private final IncomingMessageEnvelope ssp0EndOfStream = EndOfStreamManager.buildEndOfStreamEnvelope(ssp0); + private final IncomingMessageEnvelope ssp1EndOfStream = EndOfStreamManager.buildEndOfStreamEnvelope(ssp1); TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) { TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap()); scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet(); return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics, null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), - manager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>())); + manager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, null); } TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) { @@ -569,7 +570,7 @@ public class TestAsyncRunLoop { SystemStreamPartition ssp2 = new SystemStreamPartition("system1", "stream2", p2); IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", "key1", "message1"); IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", "key1", "message1"); - IncomingMessageEnvelope envelope3 = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2); + IncomingMessageEnvelope envelope3 = EndOfStreamManager.buildEndOfStreamEnvelope(ssp2); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new HashMap<>(); List<IncomingMessageEnvelope> messageList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java new file mode 100644 index 0000000..45b08d7 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.operators.impl.OperatorImplGraph; + + +public class TestStreamOperatorTask { + + public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask task) { + return task.getOperatorImplGraph(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 40974a6..9025077 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -19,12 +19,21 @@ package org.apache.samza.container + +import java.util +import java.util +import java.util.Collections import java.util.concurrent.ConcurrentHashMap +import com.google.common.collect.Multimap import org.apache.samza.SamzaException import org.apache.samza.Partition import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.config.Config import org.apache.samza.config.MapConfig +import org.apache.samza.control.ControlMessageUtils +import org.apache.samza.job.model.ContainerModel +import org.apache.samza.job.model.JobModel +import org.apache.samza.job.model.TaskModel import org.apache.samza.metrics.Counter import org.apache.samza.metrics.Metric import org.apache.samza.metrics.MetricsRegistryMap @@ -354,6 +363,36 @@ class TestTaskInstance { val expected = List(envelope1, envelope2, envelope4) assertEquals(expected, result.toList) } + + @Test + def testBuildInputToTasks = { + val system: String = "test-system" + val stream0: String = "test-stream-0" + val stream1: String = "test-stream-1" + + val ssp0: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(0)) + val ssp1: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(1)) + val ssp2: SystemStreamPartition = new SystemStreamPartition(system, stream1, new Partition(0)) + + val task0: TaskName = new TaskName("Task 0") + val task1: TaskName = new TaskName("Task 1") + val ssps: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition] + ssps.add(ssp0) + ssps.add(ssp2) + val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0)) + val cm0: ContainerModel = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0)) + val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1)) + val cm1: ContainerModel = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1)) + + val cms: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel] + cms.put(cm0.getProcessorId, cm0) + cms.put(cm1.getProcessorId, cm1) + + val jobModel: JobModel = new JobModel(new MapConfig, cms, null) + val streamToTasks: Multimap[SystemStream, String] = TaskInstance.buildInputToTasks(jobModel) + assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2) + assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1) + } } class MockSystemAdmin extends SystemAdmin { http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java index fb9bb56..de0d1da 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java @@ -36,6 +36,7 @@ import org.apache.commons.lang.Validate; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.control.EndOfStreamManager; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.IncomingMessageEnvelope; @@ -236,7 +237,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { consumerMetrics.incNumEvents(systemStreamPartition); consumerMetrics.incTotalNumEvents(); } - offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition)); + offerMessage(systemStreamPartition, EndOfStreamManager.buildEndOfStreamEnvelope(systemStreamPartition)); reader.close(); } http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java b/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java new file mode 100644 index 0000000..08e866e --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.processor; + +import org.apache.samza.container.SamzaContainer; + +public class TestStreamProcessorUtil { + public static SamzaContainer getContainer(StreamProcessor processor) { + return processor.getContainer(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java new file mode 100644 index 0000000..26abb13 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.controlmessages; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.test.controlmessages.TestData.PageView; +import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.apache.samza.test.util.ArraySystemFactory; +import org.apache.samza.test.util.Base64Serializer; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +/** + * This test uses an array as a bounded input source, and does a partitionBy() and sink() after reading the input. + * It verifies the pipeline will stop and the number of output messages should equal to the input. + */ +public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { + + + private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"}; + + @Test + public void testPipeline() throws Exception { + Random random = new Random(); + int count = 100; + PageView[] pageviews = new PageView[count]; + for (int i = 0; i < count; i++) { + String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; + int memberId = random.nextInt(10); + pageviews[i] = new PageView(pagekey, memberId); + } + + int partitionCount = 4; + Map<String, String> configs = new HashMap<>(); + configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName()); + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews)); + configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); + + configs.put(JobConfig.JOB_NAME(), "test-eos-job"); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.samza.key.serde", "int"); + configs.put("systems.kafka.samza.msg.serde", "json"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory"); + configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + List<PageView> received = new ArrayList<>(); + final StreamApplication app = (streamGraph, cfg) -> { + streamGraph.getInputStream("PageView", (k, v) -> (PageView) v) + .partitionBy(PageView::getMemberId) + .sink((m, collector, coordinator) -> { + received.add(m); + }); + }; + runner.run(app); + runner.waitForFinish(); + + assertEquals(received.size(), count * partitionCount); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java new file mode 100644 index 0000000..8541b55 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.controlmessages; + +import java.io.Serializable; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +public class TestData { + + public static class PageView implements Serializable { + @JsonProperty("pageKey") + final String pageKey; + @JsonProperty("memberId") + final int memberId; + + @JsonProperty("pageKey") + public String getPageKey() { + return pageKey; + } + + @JsonProperty("memberId") + public int getMemberId() { + return memberId; + } + + @JsonCreator + public PageView(@JsonProperty("pageKey") String pageKey, @JsonProperty("memberId") int memberId) { + this.pageKey = pageKey; + this.memberId = memberId; + } + } + + public static class PageViewJsonSerdeFactory implements SerdeFactory<PageView> { + @Override + public Serde<PageView> getSerde(String name, Config config) { + return new PageViewJsonSerde(); + } + } + + public static class PageViewJsonSerde implements Serde<PageView> { + ObjectMapper mapper = new ObjectMapper(); + + @Override + public PageView fromBytes(byte[] bytes) { + try { + return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<PageView>() { }); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public byte[] toBytes(PageView pv) { + try { + return mapper.writeValueAsString(pv).getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java new file mode 100644 index 0000000..58da8bd --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.controlmessages; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.TaskInstance; +import org.apache.samza.container.TaskName; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.control.EndOfStreamManager; +import org.apache.samza.control.WatermarkManager; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.operators.impl.InputOperatorImpl; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.operators.impl.OperatorImplGraph; +import org.apache.samza.operators.impl.TestOperatorImpl; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.processor.StreamProcessor; +import org.apache.samza.processor.TestStreamProcessorUtil; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.TestLocalApplicationRunner; +import org.apache.samza.serializers.IntegerSerdeFactory; +import org.apache.samza.serializers.StringSerdeFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.AsyncStreamTaskAdapter; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TestStreamOperatorTask; +import org.apache.samza.test.controlmessages.TestData.PageView; +import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.apache.samza.test.util.SimpleSystemAdmin; +import org.apache.samza.test.util.TestStreamConsumer; +import org.junit.Test; +import scala.collection.JavaConverters; + +import static org.junit.Assert.assertEquals; + + +public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { + + private static int offset = 1; + private static final String TEST_SYSTEM = "test"; + private static final String TEST_STREAM = "PageView"; + private static final int PARTITION_COUNT = 2; + private static final SystemStreamPartition SSP0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + private static final SystemStreamPartition SSP1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + + private final static List<IncomingMessageEnvelope> TEST_DATA = new ArrayList<>(); + static { + TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0)); + TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1)); + TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(1, SSP0)); + TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(2, SSP1)); + TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(4, SSP0)); + TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(3, SSP1)); + TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0)); + TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1)); + TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP0)); + TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP1)); + } + + public final static class TestSystemFactory implements SystemFactory { + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new TestStreamConsumer(TEST_DATA); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + return null; + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SimpleSystemAdmin(config); + } + } + + private static IncomingMessageEnvelope createIncomingMessage(Object message, SystemStreamPartition ssp) { + return new IncomingMessageEnvelope(ssp, String.valueOf(offset++), "", message); + } + + @Test + public void testWatermark() throws Exception { + Map<String, String> configs = new HashMap<>(); + configs.put("systems.test.samza.factory", TestSystemFactory.class.getName()); + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT)); + + configs.put(JobConfig.JOB_NAME(), "test-watermark-job"); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.samza.key.serde", "int"); + configs.put("systems.kafka.samza.msg.serde", "json"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.int.class", IntegerSerdeFactory.class.getName()); + configs.put("serializers.registry.string.class", StringSerdeFactory.class.getName()); + configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + List<PageView> received = new ArrayList<>(); + final StreamApplication app = (streamGraph, cfg) -> { + streamGraph.getInputStream("PageView", (k, v) -> (PageView) v) + .partitionBy(PageView::getMemberId) + .sink((m, collector, coordinator) -> { + received.add(m); + }); + }; + runner.run(app); + Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner); + + runner.waitForFinish(); + + StreamOperatorTask task0 = tasks.get("Partition 0"); + OperatorImplGraph graph = TestStreamOperatorTask.getOperatorImplGraph(task0); + OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY); + assertEquals(TestOperatorImpl.getInputWatermark(pb), 4); + assertEquals(TestOperatorImpl.getOutputWatermark(pb), 4); + OperatorImpl sink = getOperator(graph, OperatorSpec.OpCode.SINK); + assertEquals(TestOperatorImpl.getInputWatermark(sink), 3); + assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3); + + StreamOperatorTask task1 = tasks.get("Partition 1"); + graph = TestStreamOperatorTask.getOperatorImplGraph(task1); + pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY); + assertEquals(TestOperatorImpl.getInputWatermark(pb), 3); + assertEquals(TestOperatorImpl.getOutputWatermark(pb), 3); + sink = getOperator(graph, OperatorSpec.OpCode.SINK); + assertEquals(TestOperatorImpl.getInputWatermark(sink), 3); + assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3); + } + + Map<String, StreamOperatorTask> getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception { + StreamProcessor processor = TestLocalApplicationRunner.getProcessors(runner).iterator().next(); + SamzaContainer container = TestStreamProcessorUtil.getContainer(processor); + Map<TaskName, TaskInstance> taskInstances = JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava(); + Map<String, StreamOperatorTask> tasks = new HashMap<>(); + for (Map.Entry<TaskName, TaskInstance> entry : taskInstances.entrySet()) { + AsyncStreamTaskAdapter adapter = (AsyncStreamTaskAdapter) entry.getValue().task(); + Field field = AsyncStreamTaskAdapter.class.getDeclaredField("wrappedTask"); + field.setAccessible(true); + StreamOperatorTask task = (StreamOperatorTask) field.get(adapter); + tasks.put(entry.getKey().getTaskName(), task); + } + return tasks; + } + + OperatorImpl getOperator(OperatorImplGraph graph, OperatorSpec.OpCode opCode) { + for (InputOperatorImpl input : graph.getAllInputOperators()) { + Set<OperatorImpl> nextOps = TestOperatorImpl.getNextOperators(input); + while (!nextOps.isEmpty()) { + OperatorImpl op = nextOps.iterator().next(); + if (TestOperatorImpl.getOpCode(op) == opCode) { + return op; + } else { + nextOps = TestOperatorImpl.getNextOperators(op); + } + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java new file mode 100644 index 0000000..9b96216 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.util; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.samza.config.Config; +import org.apache.samza.control.EndOfStreamManager; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStreamPartition; + +/** + * A simple implementation of array system consumer + */ +public class ArraySystemConsumer implements SystemConsumer { + boolean done = false; + private final Config config; + + public ArraySystemConsumer(Config config) { + this.config = config; + } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public void register(SystemStreamPartition systemStreamPartition, String s) { + } + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long l) throws InterruptedException { + if (!done) { + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>(); + set.forEach(ssp -> { + List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config)) + .map(object -> new IncomingMessageEnvelope(ssp, null, null, object)).collect(Collectors.toList()); + envelopes.add(EndOfStreamManager.buildEndOfStreamEnvelope(ssp)); + envelopeMap.put(ssp, envelopes); + }); + done = true; + return envelopeMap; + } else { + return Collections.emptyMap(); + } + + } + + private static Object[] getArrayObjects(String stream, Config config) { + try { + return Base64Serializer.deserialize(config.get("streams." + stream + ".source"), Object[].class); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java new file mode 100644 index 0000000..0632865 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.util; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; + + +/** + * System factory for the stream from an array + */ +public class ArraySystemFactory implements SystemFactory { + + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry metricsRegistry) { + return new ArraySystemConsumer(config); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry metricsRegistry) { + // no producer + return null; + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SimpleSystemAdmin(config); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java b/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java new file mode 100644 index 0000000..1a17a3d --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Base64; + + +public class Base64Serializer { + private Base64Serializer() {} + + public static String serializeUnchecked(Serializable serializable) { + try { + return serialize(serializable); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static String serialize(Serializable serializable) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(serializable); + oos.close(); + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + public static <T> T deserializeUnchecked(String serialized, Class<T> klass) { + try { + return deserialize(serialized, klass); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static <T> T deserialize(String serialized, Class<T> klass) throws IOException, ClassNotFoundException { + final byte[] bytes = Base64.getDecoder().decode(serialized); + final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + @SuppressWarnings("unchecked") + T object = (T) ois.readObject(); + ois.close(); + return object; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java new file mode 100644 index 0000000..41f01c5 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * A dummy system admin + */ +public class SimpleSystemAdmin implements SystemAdmin { + private final Config config; + + public SimpleSystemAdmin(Config config) { + this.config = config; + } + + @Override + public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { + return offsets.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, null)); + } + + @Override + public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { + return streamNames.stream() + .collect(Collectors.toMap( + Function.<String>identity(), + streamName -> { + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>(); + int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1); + for (int i = 0; i < partitionCount; i++) { + metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)); + } + return new SystemStreamMetadata(streamName, metadataMap); + })); + } + + @Override + public void createChangelogStream(String streamName, int numOfPartitions) { + throw new UnsupportedOperationException(); + } + + @Override + public void validateChangelogStream(String streamName, int numOfPartitions) { + throw new UnsupportedOperationException(); + } + + @Override + public void createCoordinatorStream(String streamName) { + throw new UnsupportedOperationException(); + } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + if (offset1 == null) { + return offset2 == null ? 0 : -1; + } else if (offset2 == null) { + return 1; + } + return offset1.compareTo(offset2); + } +} +
