http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java deleted file mode 100644 index 8fe7a16..0000000 --- a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.control; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.samza.Partition; -import org.apache.samza.container.TaskName; -import org.apache.samza.message.WatermarkMessage; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamMetadata; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -public class TestWatermarkManager { - - StreamMetadataCache metadataCache; - - @Before - public void setup() { - SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); - Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); - partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); - metadataCache = mock(StreamMetadataCache.class); - when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); - } - - @Test - public void testUpdateFromInputSource() { - SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); - TaskName taskName = new TaskName("Task 0"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); - WatermarkManager manager = new WatermarkManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null); - long time = System.currentTimeMillis(); - Watermark watermark = manager.update(WatermarkManager.buildWatermarkEnvelope(time, ssp)); - assertEquals(watermark.getTimestamp(), time); - } - - @Test - public void testUpdateFromIntermediateStream() { - SystemStreamPartition[] ssps = new SystemStreamPartition[3]; - ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0)); - ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0)); - ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1)); - - TaskName taskName = new TaskName("Task 0"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - for (SystemStreamPartition ssp : ssps) { - streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); - } - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); - - WatermarkManager manager = new WatermarkManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null); - int envelopeCount = 4; - IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount]; - - long[] time = {300L, 200L, 100L, 400L}; - for (int i = 0; i < envelopeCount; i++) { - envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new WatermarkMessage(time[i], "task " + i, envelopeCount)); - } - for (int i = 0; i < 3; i++) { - assertNull(manager.update(envelopes[i])); - } - // verify the first three messages won't result in end-of-stream - assertEquals(manager.getWatermarkTime(ssps[0]), WatermarkManager.TIME_NOT_EXIST); - // the fourth message will generate a watermark - Watermark watermark = manager.update(envelopes[3]); - assertNotNull(watermark); - assertEquals(watermark.getTimestamp(), 100); - assertEquals(manager.getWatermarkTime(ssps[1]), WatermarkManager.TIME_NOT_EXIST); - assertEquals(manager.getWatermarkTime(ssps[2]), WatermarkManager.TIME_NOT_EXIST); - - - // stream2 has two partitions assigned to this task, so it requires a message from each partition to calculate watermarks - long[] time1 = {300L, 200L, 100L, 400L}; - envelopes = new IncomingMessageEnvelope[envelopeCount]; - for (int i = 0; i < envelopeCount; i++) { - envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "", new WatermarkMessage(time1[i], "task " + i, envelopeCount)); - } - // verify the messages for the partition 0 won't generate watermark - for (int i = 0; i < 4; i++) { - assertNull(manager.update(envelopes[i])); - } - assertEquals(manager.getWatermarkTime(ssps[1]), 100L); - - long[] time2 = {350L, 150L, 500L, 80L}; - for (int i = 0; i < envelopeCount; i++) { - envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "", new WatermarkMessage(time2[i], "task " + i, envelopeCount)); - } - for (int i = 0; i < 3; i++) { - assertNull(manager.update(envelopes[i])); - } - assertEquals(manager.getWatermarkTime(ssps[2]), WatermarkManager.TIME_NOT_EXIST); - // the fourth message will generate the watermark - watermark = manager.update(envelopes[3]); - assertNotNull(watermark); - assertEquals(manager.getWatermarkTime(ssps[2]), 80L); - assertEquals(watermark.getTimestamp(), 80L); - } - - @Test - public void testSendWatermark() { - SystemStream ints = new SystemStream("test-system", "int-stream"); - SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); - Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); - partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); - StreamMetadataCache metadataCache = mock(StreamMetadataCache.class); - when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); - - MessageCollector collector = mock(MessageCollector.class); - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); - - WatermarkManager manager = new WatermarkManager("task 0", - listener, - HashMultimap.create(), - Collections.EMPTY_SET, - metadataCache, - collector); - - long time = System.currentTimeMillis(); - Set<Integer> partitions = new HashSet<>(); - doAnswer(invocation -> { - OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0]; - partitions.add((Integer) envelope.getPartitionKey()); - WatermarkMessage watermarkMessage = (WatermarkMessage) envelope.getMessage(); - assertEquals(watermarkMessage.getTaskName(), "task 0"); - assertEquals(watermarkMessage.getTaskCount(), 8); - assertEquals(watermarkMessage.getTimestamp(), time); - return null; - }).when(collector).send(any()); - - manager.sendWatermark(time, ints, 8); - assertEquals(partitions.size(), 4); - } - - @Test - public void testPropagate() { - StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); - List<StreamSpec> inputs = new ArrayList<>(); - inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system")); - inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system")); - - IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); - - SystemStream input1 = new SystemStream("test-system", "input-stream-1"); - SystemStream input2 = new SystemStream("test-system", "input-stream-2"); - SystemStream input3 = new SystemStream("test-system", "input-stream-3"); - SystemStream ints = new SystemStream("test-system", "int-stream"); - SystemStreamPartition[] ssps0 = new SystemStreamPartition[3]; - ssps0[0] = new SystemStreamPartition(input1, new Partition(0)); - ssps0[1] = new SystemStreamPartition(input2, new Partition(0)); - ssps0[2] = new SystemStreamPartition(ints, new Partition(0)); - - SystemStreamPartition[] ssps1 = new SystemStreamPartition[4]; - ssps1[0] = new SystemStreamPartition(input1, new Partition(1)); - ssps1[1] = new SystemStreamPartition(input2, new Partition(1)); - ssps1[2] = new SystemStreamPartition(input3, new Partition(1)); - ssps1[3] = new SystemStreamPartition(ints, new Partition(1)); - - SystemStreamPartition[] ssps2 = new SystemStreamPartition[2]; - ssps2[0] = new SystemStreamPartition(input3, new Partition(2)); - ssps2[1] = new SystemStreamPartition(ints, new Partition(2)); - - - TaskName t0 = new TaskName("task 0"); //consume input1 and input2 - TaskName t1 = new TaskName("task 1"); //consume input 1 and input2 and input 3 - TaskName t2 = new TaskName("task 2"); //consume input2 and input 3 - Multimap<SystemStream, String> inputToTasks = HashMultimap.create(); - for (SystemStreamPartition ssp : ssps0) { - inputToTasks.put(ssp.getSystemStream(), t0.getTaskName()); - } - for (SystemStreamPartition ssp : ssps1) { - inputToTasks.put(ssp.getSystemStream(), t1.getTaskName()); - } - for (SystemStreamPartition ssp : ssps2) { - inputToTasks.put(ssp.getSystemStream(), t2.getTaskName()); - } - - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(ioGraph); - WatermarkManager manager = spy( - new WatermarkManager(t0.getTaskName(), listener, inputToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null)); - - IncomingMessageEnvelope envelope = WatermarkManager.buildWatermarkEnvelope(System.currentTimeMillis(), ssps0[0]); - doNothing().when(manager).sendWatermark(anyLong(), any(), anyInt()); - Watermark watermark = manager.update(envelope); - assertNotNull(watermark); - long time = System.currentTimeMillis(); - Watermark updatedWatermark = watermark.copyWithTimestamp(time); - updatedWatermark.propagate(ints); - ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor<SystemStream> arg2 = ArgumentCaptor.forClass(SystemStream.class); - ArgumentCaptor<Integer> arg3 = ArgumentCaptor.forClass(Integer.class); - verify(manager).sendWatermark(arg1.capture(), arg2.capture(), arg3.capture()); - assertEquals(arg1.getValue().longValue(), time); - assertEquals(arg2.getValue(), ints); - assertEquals(arg3.getValue().intValue(), 2); - } -}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index c51b1ea..af0b786 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -32,6 +32,7 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.container.TaskContextImpl; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.TestClock; @@ -253,7 +254,7 @@ public class TestJoinOperator { when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem")); when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2")); - TaskContext taskContext = mock(TaskContext.class); + TaskContextImpl taskContext = mock(TaskContextImpl.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index ca8a151..1edc5f6 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -26,6 +26,7 @@ import junit.framework.Assert; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.triggers.Trigger; @@ -41,7 +42,6 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.TestClock; import org.junit.Before; @@ -60,13 +60,13 @@ public class TestWindowOperator { private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); private Config config; - private TaskContext taskContext; + private TaskContextImpl taskContext; private ApplicationRunner runner; @Before public void setup() throws Exception { config = mock(Config.class); - taskContext = mock(TaskContext.class); + taskContext = mock(TaskContextImpl.class); runner = mock(ApplicationRunner.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java new file mode 100644 index 0000000..d17d751 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.WatermarkMessage; +import org.apache.samza.task.MessageCollector; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestControlMessageSender { + + @Test + public void testSend() { + SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); + partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); + when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); + StreamMetadataCache metadataCache = mock(StreamMetadataCache.class); + when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); + + SystemStream systemStream = new SystemStream("test-system", "test-stream"); + Set<Integer> partitions = new HashSet<>(); + MessageCollector collector = mock(MessageCollector.class); + doAnswer(invocation -> { + OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0]; + partitions.add((Integer) envelope.getPartitionKey()); + assertEquals(envelope.getSystemStream(), systemStream); + return null; + }).when(collector).send(any()); + + ControlMessageSender sender = new ControlMessageSender(metadataCache); + WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), "task 0"); + sender.send(watermark, systemStream, collector); + assertEquals(partitions.size(), 4); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java new file mode 100644 index 0000000..887991f --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class TestEndOfStreamStates { + + @Test + public void testUpdate() { + SystemStream input = new SystemStream("system", "input"); + SystemStream intermediate = new SystemStream("system", "intermediate"); + + Set<SystemStreamPartition> ssps = new HashSet<>(); + SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0)); + SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0)); + SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1)); + ssps.add(inputPartition0); + ssps.add(intPartition0); + ssps.add(intPartition1); + + Map<SystemStream, Integer> producerCounts = new HashMap<>(); + producerCounts.put(intermediate, 2); + + EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ssps, producerCounts); + assertFalse(endOfStreamStates.isEndOfStream(input)); + assertFalse(endOfStreamStates.isEndOfStream(intermediate)); + assertFalse(endOfStreamStates.allEndOfStream()); + + IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope(inputPartition0); + endOfStreamStates.update((EndOfStreamMessage) envelope.getMessage(), envelope.getSystemStreamPartition()); + assertTrue(endOfStreamStates.isEndOfStream(input)); + assertFalse(endOfStreamStates.isEndOfStream(intermediate)); + assertFalse(endOfStreamStates.allEndOfStream()); + + EndOfStreamMessage eos = new EndOfStreamMessage("task 0"); + endOfStreamStates.update(eos, intPartition0); + endOfStreamStates.update(eos, intPartition1); + assertFalse(endOfStreamStates.isEndOfStream(intermediate)); + assertFalse(endOfStreamStates.allEndOfStream()); + + eos = new EndOfStreamMessage("task 1"); + endOfStreamStates.update(eos, intPartition0); + endOfStreamStates.update(eos, intPartition1); + assertTrue(endOfStreamStates.isEndOfStream(intermediate)); + assertTrue(endOfStreamStates.allEndOfStream()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 3ae8f5b..4a78da8 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,21 +18,22 @@ */ package org.apache.samza.operators.impl; +import java.util.Collection; +import java.util.Collections; import java.util.Set; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import java.util.Collection; -import java.util.Collections; - import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; @@ -47,7 +48,7 @@ public class TestOperatorImpl { @Test(expected = IllegalStateException.class) public void testMultipleInitShouldThrow() { OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class)); - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); opImpl.init(mock(Config.class), mockTaskContext); opImpl.init(mock(Config.class), mockTaskContext); @@ -61,7 +62,7 @@ public class TestOperatorImpl { @Test public void testOnMessagePropagatesResults() { - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Object mockTestOpImplOutput = mock(Object.class); @@ -93,8 +94,8 @@ public class TestOperatorImpl { @Test public void testOnMessageUpdatesMetrics() { - TaskContext mockTaskContext = mock(TaskContext.class); - MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); + ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); Counter mockCounter = mock(Counter.class); Timer mockTimer = mock(Timer.class); @@ -117,7 +118,7 @@ public class TestOperatorImpl { @Test public void testOnTimerPropagatesResultsAndTimer() { - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Object mockTestOpImplOutput = mock(Object.class); @@ -153,8 +154,8 @@ public class TestOperatorImpl { @Test public void testOnTimerUpdatesMetrics() { - TaskContext mockTaskContext = mock(TaskContext.class); - MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); + ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); Counter mockMessageCounter = mock(Counter.class); Timer mockTimer = mock(Timer.class); @@ -210,6 +211,11 @@ public class TestOperatorImpl { TestOpSpec() { super(OpCode.INPUT, 1); } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } } public static Set<OperatorImpl> getNextOperators(OperatorImpl op) { @@ -221,11 +227,11 @@ public class TestOperatorImpl { } public static long getInputWatermark(OperatorImpl op) { - return op.getInputWatermarkTime(); + return op.getInputWatermark(); } public static long getOutputWatermark(OperatorImpl op) { - return op.getOutputWatermarkTime(); + return op.getOutputWatermark(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 4505eef..9fab1b7 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -19,8 +19,23 @@ package org.apache.samza.operators.impl; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.Partition; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; @@ -32,6 +47,7 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -62,7 +78,7 @@ public class TestOperatorImplGraph { public void testEmptyChain() { StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class)); OperatorImplGraph opGraph = - new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContext.class), mock(Clock.class)); + new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class)); assertEquals(0, opGraph.getAllInputOperators().size()); } @@ -82,8 +98,9 @@ public class TestOperatorImplGraph { .map(mock(MapFunction.class)) .sendTo(outputStream); - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0")); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); @@ -113,17 +130,17 @@ public class TestOperatorImplGraph { inputStream.filter(mock(FilterFunction.class)); inputStream.map(mock(MapFunction.class)); - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); assertEquals(2, inputOpImpl.registeredOperators.size()); - assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl -> - ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER)); - assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl -> - ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP)); + assertTrue(inputOpImpl.registeredOperators.stream() + .anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER)); + assertTrue(inputOpImpl.registeredOperators.stream() + .anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP)); } @Test @@ -139,13 +156,13 @@ public class TestOperatorImplGraph { MapFunction mockMapFunction = mock(MapFunction.class); mergedStream.map(mockMapFunction); - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); // verify that the DAG after merge is only traversed & initialized once - verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContext.class)); + verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class)); } @Test @@ -160,13 +177,13 @@ public class TestOperatorImplGraph { MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v); inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1)); - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); // verify that join function is initialized once. - verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class)); + verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class)); InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1")); InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2")); @@ -201,7 +218,7 @@ public class TestOperatorImplGraph { when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system")); when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system")); Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); + TaskContextImpl mockContext = mock(TaskContextImpl.class); when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig); @@ -255,4 +272,144 @@ public class TestOperatorImplGraph { } }; } + + @Test + public void testGetStreamToConsumerTasks() { + String system = "test-system"; + String stream0 = "test-stream-0"; + String stream1 = "test-stream-1"; + + SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, new Partition(0)); + + TaskName task0 = new TaskName("Task 0"); + TaskName task1 = new TaskName("Task 1"); + Set<SystemStreamPartition> ssps = new HashSet<>(); + ssps.add(ssp0); + ssps.add(ssp2); + TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0)); + ContainerModel cm0 = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0)); + TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1)); + ContainerModel cm1 = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1)); + + Map<String, ContainerModel> cms = new HashMap<>(); + cms.put(cm0.getProcessorId(), cm0); + cms.put(cm1.getProcessorId(), cm1); + + JobModel jobModel = new JobModel(new MapConfig(), cms, null); + Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel); + assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2); + assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1); + } + + @Test + public void testGetOutputToInputStreams() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(JobConfig.JOB_NAME(), "test-app"); + configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); + Config config = new MapConfig(configMap); + + /** + * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. + * + * input1 -> map -> join -> partitionBy (10) -> output1 + * | + * input2 -> filter -| + * | + * input3 -> filter -> partitionBy -> map -> join -> output2 + * + */ + StreamSpec input1 = new StreamSpec("input1", "input1", "system1"); + StreamSpec input2 = new StreamSpec("input2", "input2", "system2"); + StreamSpec input3 = new StreamSpec("input3", "input3", "system2"); + + StreamSpec output1 = new StreamSpec("output1", "output1", "system1"); + StreamSpec output2 = new StreamSpec("output2", "output2", "system2"); + + ApplicationRunner runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("input1")).thenReturn(input1); + when(runner.getStreamSpec("input2")).thenReturn(input2); + when(runner.getStreamSpec("input3")).thenReturn(input3); + when(runner.getStreamSpec("output1")).thenReturn(output1); + when(runner.getStreamSpec("output2")).thenReturn(output2); + + // intermediate streams used in tests + StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", "test-app-1-partition_by-10", "default-system"); + StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", "test-app-1-partition_by-6", "default-system"); + when(runner.getStreamSpec("test-app-1-partition_by-10")) + .thenReturn(int1); + when(runner.getStreamSpec("test-app-1-partition_by-6")) + .thenReturn(int2); + + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + BiFunction msgBuilder = mock(BiFunction.class); + MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); + MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).filter(m -> true); + MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + Function mockFn = mock(Function.class); + OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn); + OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn); + + m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha").sendTo(om1); + m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2); + + Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph); + Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream()); + assertEquals(inputs.size(), 2); + assertTrue(inputs.contains(input1.toSystemStream())); + assertTrue(inputs.contains(input2.toSystemStream())); + + inputs = outputToInput.get(int2.toSystemStream()); + assertEquals(inputs.size(), 1); + assertEquals(inputs.iterator().next(), input3.toSystemStream()); + } + + @Test + public void testGetProducerTaskCountForIntermediateStreams() { + /** + * the task assignment looks like the following: + * + * input1 -----> task0, task1 -----> int1 + * ^ + * input2 ------> task1, task2--------| + * v + * input3 ------> task1 -----------> int2 + * + */ + + SystemStream input1 = new SystemStream("system1", "intput1"); + SystemStream input2 = new SystemStream("system2", "intput2"); + SystemStream input3 = new SystemStream("system2", "intput3"); + + SystemStream int1 = new SystemStream("system1", "int1"); + SystemStream int2 = new SystemStream("system1", "int2"); + + + String task0 = "Task 0"; + String task1 = "Task 1"; + String task2 = "Task 2"; + + Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create(); + streamToConsumerTasks.put(input1, task0); + streamToConsumerTasks.put(input1, task1); + streamToConsumerTasks.put(input2, task1); + streamToConsumerTasks.put(input2, task2); + streamToConsumerTasks.put(input3, task1); + streamToConsumerTasks.put(int1, task0); + streamToConsumerTasks.put(int1, task1); + streamToConsumerTasks.put(int2, task0); + + Multimap<SystemStream, SystemStream> intermediateToInputStreams = HashMultimap.create(); + intermediateToInputStreams.put(int1, input1); + intermediateToInputStreams.put(int1, input2); + + intermediateToInputStreams.put(int2, input2); + intermediateToInputStreams.put(int2, input3); + + Map<SystemStream, Integer> counts = OperatorImplGraph.getProducerTaskCountForIntermediateStreams( + streamToConsumerTasks, intermediateToInputStreams); + assertTrue(counts.get(int1) == 3); + assertTrue(counts.get(int2) == 2); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java new file mode 100644 index 0000000..a726069 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.WatermarkMessage; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST; + +public class TestWatermarkStates { + + @Test + public void testUpdate() { + SystemStream input = new SystemStream("system", "input"); + SystemStream intermediate = new SystemStream("system", "intermediate"); + + Set<SystemStreamPartition> ssps = new HashSet<>(); + SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0)); + SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0)); + SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1)); + ssps.add(inputPartition0); + ssps.add(intPartition0); + ssps.add(intPartition1); + + Map<SystemStream, Integer> producerCounts = new HashMap<>(); + producerCounts.put(intermediate, 2); + + // advance watermark on input to 5 + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts); + IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L); + watermarkStates.update((WatermarkMessage) envelope.getMessage(), + envelope.getSystemStreamPartition()); + assertEquals(watermarkStates.getWatermark(input), 5L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // watermark from task 0 on int p0 to 6 + WatermarkMessage watermarkMessage = new WatermarkMessage(6L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // watermark from task 1 on int p0 to 3 + watermarkMessage = new WatermarkMessage(3L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // watermark from task 0 on int p1 to 10 + watermarkMessage = new WatermarkMessage(10L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // watermark from task 1 on int p1 to 4 + watermarkMessage = new WatermarkMessage(4L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 4L); + // verify we got a watermark 3 (min) for int stream + assertEquals(watermarkStates.getWatermark(intermediate), 3L); + + // advance watermark from task 1 on int p0 to 8 + watermarkMessage = new WatermarkMessage(8L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 6L); + // verify we got a watermark 4 (min) for int stream + assertEquals(watermarkStates.getWatermark(intermediate), 4L); + + // advance watermark from task 1 on int p1 to 7 + watermarkMessage = new WatermarkMessage(7L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 7L); + // verify we got a watermark 6 (min) for int stream + assertEquals(watermarkStates.getWatermark(intermediate), 6L); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java index 7192525..7a3faca 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java @@ -24,11 +24,11 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import org.apache.samza.message.EndOfStreamMessage; -import org.apache.samza.message.WatermarkMessage; -import org.apache.samza.message.IntermediateMessageType; import org.apache.samza.serializers.IntermediateMessageSerde; import org.apache.samza.serializers.Serde; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.MessageType; +import org.apache.samza.system.WatermarkMessage; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -96,7 +96,7 @@ public class TestIntermediateMessageSerde { TestUserMessage userMessage = new TestUserMessage(msg, 0, System.currentTimeMillis()); byte[] bytes = imserde.toBytes(userMessage); TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes); - assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.USER_MESSAGE); + assertEquals(MessageType.of(de), MessageType.USER_MESSAGE); assertEquals(de.getMessage(), msg); assertEquals(de.getOffset(), 0); assertTrue(de.getTimestamp() > 0); @@ -106,12 +106,11 @@ public class TestIntermediateMessageSerde { public void testWatermarkMessageSerde() { IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde()); String taskName = "task-1"; - WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), taskName, 8); + WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), taskName); byte[] bytes = imserde.toBytes(watermark); WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes); - assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.WATERMARK_MESSAGE); + assertEquals(MessageType.of(de), MessageType.WATERMARK); assertEquals(de.getTaskName(), taskName); - assertEquals(de.getTaskCount(), 8); assertTrue(de.getTimestamp() > 0); } @@ -120,12 +119,11 @@ public class TestIntermediateMessageSerde { IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde()); String streamId = "test-stream"; String taskName = "task-1"; - EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8); + EndOfStreamMessage eos = new EndOfStreamMessage(taskName); byte[] bytes = imserde.toBytes(eos); EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes); - assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.END_OF_STREAM_MESSAGE); + assertEquals(MessageType.of(de), MessageType.END_OF_STREAM); assertEquals(de.getTaskName(), taskName); - assertEquals(de.getTaskCount(), 8); assertEquals(de.getVersion(), 1); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 3d4976b..5a4b4bf 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -39,7 +39,6 @@ import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskInstanceExceptionHandler; import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; -import org.apache.samza.control.EndOfStreamManager; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; @@ -78,8 +77,8 @@ public class TestAsyncRunLoop { private final IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0"); private final IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1"); private final IncomingMessageEnvelope envelope3 = new IncomingMessageEnvelope(ssp0, "1", "key0", "value0"); - private final IncomingMessageEnvelope ssp0EndOfStream = EndOfStreamManager.buildEndOfStreamEnvelope(ssp0); - private final IncomingMessageEnvelope ssp1EndOfStream = EndOfStreamManager.buildEndOfStreamEnvelope(ssp1); + private final IncomingMessageEnvelope ssp0EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0); + private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1); TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) { TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap()); @@ -575,7 +574,7 @@ public class TestAsyncRunLoop { SystemStreamPartition ssp2 = new SystemStreamPartition("system1", "stream2", p2); IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", "key1", "message1"); IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", "key1", "message1"); - IncomingMessageEnvelope envelope3 = EndOfStreamManager.buildEndOfStreamEnvelope(ssp2); + IncomingMessageEnvelope envelope3 = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new HashMap<>(); List<IncomingMessageEnvelope> messageList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 81f3ed1..ea11d9f 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -20,28 +20,21 @@ package org.apache.samza.container -import java.util -import java.util -import java.util.Collections import java.util.concurrent.ConcurrentHashMap -import com.google.common.collect.Multimap -import org.apache.samza.SamzaException import org.apache.samza.Partition -import org.apache.samza.checkpoint.OffsetManager -import org.apache.samza.config.Config -import org.apache.samza.config.MapConfig -import org.apache.samza.control.ControlMessageUtils -import org.apache.samza.job.model.ContainerModel -import org.apache.samza.job.model.JobModel -import org.apache.samza.job.model.TaskModel -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Metric -import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.checkpoint.{Checkpoint, OffsetManager} import org.apache.samza.config.{Config, MapConfig} import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap} import org.apache.samza.serializers.SerdeManager +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.SystemConsumer +import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.SystemProducer +import org.apache.samza.system.SystemProducers +import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system._ @@ -124,9 +117,9 @@ class TestTaskInstance { */ class TroublesomeTask extends StreamTask with WindowableTask { def process( - envelope: IncomingMessageEnvelope, - collector: MessageCollector, - coordinator: TaskCoordinator) { + envelope: IncomingMessageEnvelope, + collector: MessageCollector, + coordinator: TaskCoordinator) { envelope.getOffset().toInt match { case offset if offset % 2 == 0 => throw new TroublesomeException @@ -143,8 +136,8 @@ class TestTaskInstance { * Helper method used to retrieve the value of a counter from a group. */ private def getCount( - group: ConcurrentHashMap[String, Metric], - name: String): Long = { + group: ConcurrentHashMap[String, Metric], + name: String): Long = { group.get("exception-ignored-" + name.toLowerCase).asInstanceOf[Counter].getCount } @@ -407,36 +400,6 @@ class TestTaskInstance { // Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint) } - - @Test - def testBuildInputToTasks = { - val system: String = "test-system" - val stream0: String = "test-stream-0" - val stream1: String = "test-stream-1" - - val ssp0: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(0)) - val ssp1: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(1)) - val ssp2: SystemStreamPartition = new SystemStreamPartition(system, stream1, new Partition(0)) - - val task0: TaskName = new TaskName("Task 0") - val task1: TaskName = new TaskName("Task 1") - val ssps: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition] - ssps.add(ssp0) - ssps.add(ssp2) - val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0)) - val cm0: ContainerModel = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0)) - val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1)) - val cm1: ContainerModel = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1)) - - val cms: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel] - cms.put(cm0.getProcessorId, cm0) - cms.put(cm1.getProcessorId, cm1) - - val jobModel: JobModel = new JobModel(new MapConfig, cms, null) - val streamToTasks: Multimap[SystemStream, String] = TaskInstance.buildInputToTasks(jobModel) - assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2) - assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1) - } } class MockSystemAdmin extends SystemAdmin { http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala index 9d808cb..774230c 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala @@ -20,8 +20,8 @@ package org.apache.samza.serializers -import org.apache.samza.message.EndOfStreamMessage -import org.apache.samza.message.WatermarkMessage +import org.apache.samza.system.EndOfStreamMessage +import org.apache.samza.system.WatermarkMessage import org.junit.Assert._ import org.junit.Assert.assertEquals import org.junit.Assert.assertEquals @@ -83,18 +83,17 @@ class TestSerdeManager { val eosStreamId = "eos-stream" val taskName = "task 1" val taskCount = 8 - outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new EndOfStreamMessage(taskName, taskCount)) + outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new EndOfStreamMessage(taskName)) se = serdeManager.toBytes(outEnvelope) inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage) de = serdeManager.fromBytes(inEnvelope) assertEquals(de.getKey, "eos") val eosMsg = de.getMessage.asInstanceOf[EndOfStreamMessage] assertEquals(eosMsg.getTaskName, taskName) - assertEquals(eosMsg.getTaskCount, taskCount) // test watermark message sent to intermediate stream val timestamp = System.currentTimeMillis() - outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new WatermarkMessage(timestamp, taskName, taskCount)) + outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new WatermarkMessage(timestamp, taskName)) se = serdeManager.toBytes(outEnvelope) inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage) de = serdeManager.fromBytes(inEnvelope) @@ -102,6 +101,5 @@ class TestSerdeManager { val watermarkMsg = de.getMessage.asInstanceOf[WatermarkMessage] assertEquals(watermarkMsg.getTimestamp, timestamp) assertEquals(watermarkMsg.getTaskName, taskName) - assertEquals(watermarkMsg.getTaskCount, taskCount) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java index de0d1da..fb9bb56 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java @@ -36,7 +36,6 @@ import org.apache.commons.lang.Validate; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.control.EndOfStreamManager; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.IncomingMessageEnvelope; @@ -237,7 +236,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { consumerMetrics.incNumEvents(systemStreamPartition); consumerMetrics.incTotalNumEvents(); } - offerMessage(systemStreamPartition, EndOfStreamManager.buildEndOfStreamEnvelope(systemStreamPartition)); + offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition)); reader.close(); } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index f313348..8493cf1 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -54,7 +54,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { @Test public void testPipeline() throws Exception { Random random = new Random(); - int count = 100; + int count = 10; PageView[] pageviews = new PageView[count]; for (int i = 0; i < count; i++) { String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 2eb72fc..d9202d3 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -36,8 +36,6 @@ import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskName; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; -import org.apache.samza.control.EndOfStreamManager; -import org.apache.samza.control.WatermarkManager; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImpl; @@ -85,14 +83,14 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { static { TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0)); TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1)); - TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(1, SSP0)); - TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(2, SSP1)); - TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(4, SSP0)); - TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(3, SSP1)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 1)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 2)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 4)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 3)); TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0)); TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1)); - TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP0)); - TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP1)); + TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP0)); + TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP1)); } public final static class TestSystemFactory implements SystemFactory { http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java index 9b96216..832457b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java +++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.samza.config.Config; -import org.apache.samza.control.EndOfStreamManager; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemStreamPartition; @@ -62,7 +61,7 @@ public class ArraySystemConsumer implements SystemConsumer { set.forEach(ssp -> { List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config)) .map(object -> new IncomingMessageEnvelope(ssp, null, null, object)).collect(Collectors.toList()); - envelopes.add(EndOfStreamManager.buildEndOfStreamEnvelope(ssp)); + envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp)); envelopeMap.put(ssp, envelopes); }); done = true;
