This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc54c0e  KAFKA-3625: TopologyTestDriver must process output for 
wall-clock-time punctuations and on close() (#4502)
dc54c0e is described below

commit dc54c0e24b3f7ca27990b1d576b8b8fd6d740ca1
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Feb 9 15:29:21 2018 -0800

    KAFKA-3625: TopologyTestDriver must process output for wall-clock-time 
punctuations and on close() (#4502)
    
    Author: Matthias J. Sax <matth...@confluent.io>
    
    Reviewer: Damian Guy <dam...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>
---
 .../apache/kafka/streams/TopologyTestDriver.java   |  64 +++++----
 .../kafka/streams/TopologyTestDriverTest.java      | 159 ++++++++++++++++++++-
 2 files changed, 188 insertions(+), 35 deletions(-)

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index ff63554..a108f22 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -328,42 +328,14 @@ public class TopologyTestDriver {
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
                 consumerRecord.value())));
-            producer.clear();
 
             // Process the record ...
             ((InternalProcessorContext) task.context()).setRecordContext(new 
ProcessorRecordContext(consumerRecord.timestamp(), offset, 
topicPartition.partition(), topicName));
             task.process();
             task.maybePunctuateStreamTime();
             task.commit();
+            captureOutputRecords();
 
-            // Capture all the records sent to the producer ...
-            for (final ProducerRecord<byte[], byte[]> record : 
producer.history()) {
-                Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());
-                if (outputRecords == null) {
-                    outputRecords = new LinkedList<>();
-                    outputRecordsByTopic.put(record.topic(), outputRecords);
-                }
-                outputRecords.add(record);
-
-                // Forward back into the topology if the produced record is to 
an internal or a source topic ...
-                final String outputTopicName = record.topic();
-                if (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)) {
-                    final byte[] serializedKey = record.key();
-                    final byte[] serializedValue = record.value();
-
-                    pipeInput(new ConsumerRecord<>(
-                        outputTopicName,
-                        -1,
-                        -1L,
-                        record.timestamp(),
-                        TimestampType.CREATE_TIME,
-                        0L,
-                        serializedKey == null ? 0 : serializedKey.length,
-                        serializedValue == null ? 0 : serializedValue.length,
-                        serializedKey,
-                        serializedValue));
-                }
-            }
         } else {
             final TopicPartition globalTopicPartition = 
globalPartitionsByTopic.get(topicName);
             if (globalTopicPartition == null) {
@@ -385,6 +357,38 @@ public class TopologyTestDriver {
         }
     }
 
+    private void captureOutputRecords() {
+        // Capture all the records sent to the producer ...
+        final List<ProducerRecord<byte[], byte[]>> output = producer.history();
+        producer.clear();
+        for (final ProducerRecord<byte[], byte[]> record : output) {
+            Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());
+            if (outputRecords == null) {
+                outputRecords = new LinkedList<>();
+                outputRecordsByTopic.put(record.topic(), outputRecords);
+            }
+            outputRecords.add(record);
+
+            // Forward back into the topology if the produced record is to an 
internal or a source topic ...
+            final String outputTopicName = record.topic();
+            if (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)) {
+                final byte[] serializedKey = record.key();
+                final byte[] serializedValue = record.value();
+
+                pipeInput(new ConsumerRecord<>(
+                    outputTopicName,
+                    -1,
+                    -1L,
+                    record.timestamp(),
+                    TimestampType.CREATE_TIME,
+                    0L,
+                    serializedKey == null ? 0 : serializedKey.length,
+                    serializedValue == null ? 0 : serializedValue.length,
+                    serializedKey,
+                    serializedValue));
+            }
+        }
+    }
     /**
      * Send input messages to the topology and then commit each message 
individually.
      *
@@ -407,6 +411,7 @@ public class TopologyTestDriver {
         mockTime.sleep(advanceMs);
         task.maybePunctuateSystemTime();
         task.commit();
+        captureOutputRecords();
     }
 
     /**
@@ -558,6 +563,7 @@ public class TopologyTestDriver {
                 // ignore
             }
         }
+        captureOutputRecords();
     }
 
     static class MockTime implements Time {
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 921f6d6..17d5e02 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -19,8 +19,12 @@ package org.apache.kafka.streams;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.processor.Processor;
@@ -28,12 +32,15 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -81,6 +88,14 @@ public class TopologyTestDriverTest {
             put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         }
     };
+    private KeyValueStore<String, Long> store;
+
+    private StringDeserializer stringDeserializer = new StringDeserializer();
+    private LongDeserializer longDeserializer = new LongDeserializer();
+    private ConsumerRecordFactory<String, Long> recordFactory = new 
ConsumerRecordFactory<>(
+        new StringSerializer(),
+        new LongSerializer());
+
 
     private final static class Record {
         private Object key;
@@ -223,7 +238,9 @@ public class TopologyTestDriverTest {
 
     @After
     public void tearDown() {
-        testDriver.close();
+        if (testDriver != null) {
+            testDriver.close();
+        }
     }
 
     private Topology setupSourceSinkTopology() {
@@ -417,7 +434,7 @@ public class TopologyTestDriverTest {
             SINK_TOPIC_1,
             new Serializer() {
                 @Override
-                public byte[] serialize(String topic, Object data) {
+                public byte[] serialize(final String topic, final Object data) 
{
                     if (data instanceof Long) {
                         return Serdes.Long().serializer().serialize(topic, 
(Long) data);
                     }
@@ -426,11 +443,11 @@ public class TopologyTestDriverTest {
                 @Override
                 public void close() {}
                 @Override
-                public void configure(Map configs, boolean isKey) {}
+                public void configure(final Map configs, final boolean isKey) 
{}
             },
             new Serializer() {
                 @Override
-                public byte[] serialize(String topic, Object data) {
+                public byte[] serialize(final String topic, final Object data) 
{
                     if (data instanceof String) {
                         return Serdes.String().serializer().serialize(topic, 
(String) data);
                     }
@@ -439,7 +456,7 @@ public class TopologyTestDriverTest {
                 @Override
                 public void close() {}
                 @Override
-                public void configure(Map configs, boolean isKey) {}
+                public void configure(final Map configs, final boolean isKey) 
{}
             },
             processor);
 
@@ -476,7 +493,7 @@ public class TopologyTestDriverTest {
     }
 
     @Test
-    public void shouldUseSinkeSpecificSerializers() {
+    public void shouldUseSinkSpecificSerializers() {
         final Topology topology = new Topology();
 
         final String sourceName1 = "source-1";
@@ -691,4 +708,134 @@ public class TopologyTestDriverTest {
         expectedStoreNames.add("globalStore");
         assertThat(testDriver.getAllStateStores().keySet(), 
equalTo(expectedStoreNames));
     }
+
+    private void setup() {
+        Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
+        topology.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withLoggingDisabled(), // need to disable logging 
to allow store pre-populating
+            "aggregator");
+        topology.addSink("sinkProcessor", "result-topic", "aggregator");
+
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
+        testDriver = new TopologyTestDriver(topology, config);
+
+        store = testDriver.getKeyValueStore("aggStore");
+        store.put("a", 21L);
+    }
+
+    @Test
+    public void shouldFlushStoreForFirstInput() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldNotUpdateStoreForSmallerValue() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        Assert.assertThat(store.get("a"), equalTo(21L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldNotUpdateStoreForLargerValue() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 
9999L));
+        Assert.assertThat(store.get("a"), equalTo(42L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 42L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldUpdateStoreForNewKey() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 
9999L));
+        Assert.assertThat(store.get("b"), equalTo(21L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "b", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldPunctuateIfEvenTimeAdvances() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
10000L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldPunctuateIfWallClockTimeAdvances() {
+        setup();
+        testDriver.advanceWallClockTime(60000);
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    private class CustomMaxAggregatorSupplier implements 
ProcessorSupplier<String, Long> {
+        @Override
+        public Processor<String, Long> get() {
+            return new CustomMaxAggregator();
+        }
+    }
+
+    private class CustomMaxAggregator implements Processor<String, Long> {
+        ProcessorContext context;
+        private KeyValueStore<String, Long> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+            context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new 
Punctuator() {
+                @Override
+                public void punctuate(final long timestamp) {
+                    flushStore();
+                }
+            });
+            context.schedule(10000, PunctuationType.STREAM_TIME, new 
Punctuator() {
+                @Override
+                public void punctuate(final long timestamp) {
+                    flushStore();
+                }
+            });
+            store = (KeyValueStore<String, Long>) 
context.getStateStore("aggStore");
+        }
+
+        @Override
+        public void process(final String key, final Long value) {
+            final Long oldValue = store.get(key);
+            if (oldValue == null || value > oldValue) {
+                store.put(key, value);
+            }
+        }
+
+        private void flushStore() {
+            final KeyValueIterator<String, Long> it = store.all();
+            while (it.hasNext()) {
+                final KeyValue<String, Long> next = it.next();
+                context.forward(next.key, next.value);
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {}
+
+        @Override
+        public void close() {}
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to