Repository: kafka Updated Branches: refs/heads/trunk 8b9b07e5d -> 3a58407e2
http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 51276f3..7c158e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -131,21 +131,21 @@ public class KTableSourceTest { driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); - proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); - proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)"); + proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); driver.process(topic1, "A", "03"); - proc1.checkAndClearResult("A:(03<-null)"); + proc1.checkAndClearProcessResult("A:(03<-null)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); + proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } finally { Utils.delete(stateDir); @@ -176,21 +176,21 @@ public class KTableSourceTest { driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); - proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); - proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)"); + proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); driver.process(topic1, "A", "03"); - proc1.checkAndClearResult("A:(03<-02)"); + proc1.checkAndClearProcessResult("A:(03<-02)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)"); + proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); } finally { Utils.delete(stateDir); http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 5bf1b5e..a1c07af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -60,17 +59,17 @@ public class PartitionGroupTest { // add three 3 records with timestamp 1, 3, 5 to partition-1 List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); group.addRawRecords(partition1, list1); // add three 3 records with timestamp 2, 4, 6 to partition-2 List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue), + new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue), + new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue)); group.addRawRecords(partition2, list2); @@ -82,7 +81,7 @@ public class PartitionGroupTest { StampedRecord record; PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); - // get one record + // get one record, now the time should be advanced record = group.nextRecord(info); assertEquals(partition1, info.partition()); assertEquals(1L, record.timestamp); @@ -99,5 +98,72 @@ public class PartitionGroupTest { assertEquals(2, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); assertEquals(3L, group.timestamp()); + + // add three 3 records with timestamp 2, 4, 6 to partition-1 again + List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue)); + + group.addRawRecords(partition1, list3); + + assertEquals(6, group.numBuffered()); + assertEquals(4, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + + // get one record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(3L, record.timestamp); + assertEquals(5, group.numBuffered()); + assertEquals(3, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + + // get one more record, now time should be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(5L, record.timestamp); + assertEquals(4, group.numBuffered()); + assertEquals(2, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + + // get one more record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(2L, record.timestamp); + assertEquals(3, group.numBuffered()); + assertEquals(1, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + + // get one more record, now time should be advanced + record = group.nextRecord(info); + assertEquals(partition2, info.partition()); + assertEquals(4L, record.timestamp); + assertEquals(2, group.numBuffered()); + assertEquals(1, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + + // get one more record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(4L, record.timestamp); + assertEquals(1, group.numBuffered()); + assertEquals(0, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + + // get one more record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition2, info.partition()); + assertEquals(6L, record.timestamp); + assertEquals(0, group.numBuffered()); + assertEquals(0, group.numBuffered(partition1)); + assertEquals(0, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 33fa5c4..dd48947 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Test; @@ -46,6 +47,7 @@ import java.util.Properties; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class StreamTaskTest { @@ -58,10 +60,12 @@ public class StreamTaskTest { private final TopicPartition partition2 = new TopicPartition("topic2", 1); private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2); - private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10L); + private final ProcessorTopology topology = new ProcessorTopology( - Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2), + Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode) processor), new HashMap<String, SourceNode>() { { put("topic1", source1); @@ -94,6 +98,8 @@ public class StreamTaskTest { @Before public void setup() { consumer.assign(Arrays.asList(partition1, partition2)); + source1.addChild(processor); + source2.addChild(processor); } @SuppressWarnings("unchecked") @@ -211,6 +217,73 @@ public class StreamTaskTest { } } + @SuppressWarnings("unchecked") + @Test + public void testMaybePunctuate() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamsConfig config = createConfig(baseDir); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + assertTrue(task.maybePunctuate()); + + assertEquals(5, task.process()); + assertEquals(1, source1.numReceived); + assertEquals(0, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + assertEquals(4, task.process()); + assertEquals(1, source1.numReceived); + assertEquals(1, source2.numReceived); + + assertTrue(task.maybePunctuate()); + + assertEquals(3, task.process()); + assertEquals(2, source1.numReceived); + assertEquals(1, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + assertEquals(2, task.process()); + assertEquals(2, source1.numReceived); + assertEquals(2, source2.numReceived); + + assertTrue(task.maybePunctuate()); + + assertEquals(1, task.process()); + assertEquals(3, source1.numReceived); + assertEquals(2, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + assertEquals(0, task.process()); + assertEquals(3, source1.numReceived); + assertEquals(3, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L); + + task.close(); + + } finally { + Utils.delete(baseDir); + } + } + private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { return Arrays.asList(recs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index d3b8081..287af5a 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -143,7 +143,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S @Override public void schedule(long interval) { - throw new UnsupportedOperationException("schedule() not supported"); + throw new UnsupportedOperationException("schedule() not supported."); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java new file mode 100644 index 0000000..cf8a526 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.processor.internals.ProcessorNode; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockProcessorNode<K, V> extends ProcessorNode<K, V> { + + public static final String NAME = "MOCK-PROCESS-"; + public static final AtomicInteger INDEX = new AtomicInteger(1); + + public int numReceived = 0; + + public final MockProcessorSupplier<K, V> supplier; + + public MockProcessorNode(long scheduleInterval) { + this(new MockProcessorSupplier<K, V>(scheduleInterval)); + } + + private MockProcessorNode(MockProcessorSupplier<K, V> supplier) { + super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet()); + + this.supplier = supplier; + } + + @Override + public void process(K key, V value) { + this.numReceived++; + processor().process(key, value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index b402525..921c365 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -17,6 +17,7 @@ package org.apache.kafka.test; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -30,16 +31,28 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { public final ArrayList<String> processed = new ArrayList<>(); public final ArrayList<Long> punctuated = new ArrayList<>(); + private final long scheduleInterval; + + public MockProcessorSupplier() { + this(-1L); + } + + public MockProcessorSupplier(long scheduleInterval) { + this.scheduleInterval = scheduleInterval; + } + @Override public Processor<K, V> get() { return new MockProcessor(); } - public class MockProcessor implements Processor<K, V> { + public class MockProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext context) { - // do nothing + super.init(context); + if (scheduleInterval > 0L) + context.schedule(scheduleInterval); } @Override @@ -49,21 +62,30 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { @Override public void punctuate(long streamTime) { + assertEquals(streamTime, context().timestamp()); + assertEquals(null, context().topic()); + assertEquals(-1, context().partition()); + assertEquals(-1L, context().offset()); + punctuated.add(streamTime); } + } - @Override - public void close() { - // do nothing + public void checkAndClearProcessResult(String... expected) { + assertEquals("the number of outputs:", expected.length, processed.size()); + + for (int i = 0; i < expected.length; i++) { + assertEquals("output[" + i + "]:", expected[i], processed.get(i)); } + processed.clear(); } - public void checkAndClearResult(String... expected) { - assertEquals("the number of outputs:", expected.length, processed.size()); + public void checkAndClearPunctuateResult(long... expected) { + assertEquals("the number of outputs:", expected.length, punctuated.size()); for (int i = 0; i < expected.length; i++) { - assertEquals("output[" + i + "]:", expected[i], processed.get(i)); + assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i)); } processed.clear();
