[ 
https://issues.apache.org/jira/browse/KAFKA-6967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503984#comment-16503984
 ] 

ASF GitHub Bot commented on KAFKA-6967:
---------------------------------------

mjsax closed pull request #5096: KAFKA-6967: TopologyTestDriver does not allow 
pre-populating state stores that have change logging
URL: https://github.com/apache/kafka/pull/5096
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index ab16b428a21..fc4bf4fb29e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1017,6 +1017,7 @@ project(':streams:test-utils') {
 
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.junit
+    testCompile libs.easymock
 
     testRuntime libs.slf4jlog4j
   }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 4b8298fb2fa..79e0b42226d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -199,7 +199,7 @@ public void shouldReduceAndMaterializeResults() {
         final Map<String, Integer> results = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             assertReduced(results, topic, driver);
-            final KeyValueStore<String, Integer> reduce = 
(KeyValueStore<String, Integer>) driver.getStateStore("reduce");
+            final KeyValueStore<String, Integer> reduce = 
driver.getKeyValueStore("reduce");
             assertThat(reduce.get("A"), equalTo(5));
             assertThat(reduce.get("B"), equalTo(6));
         }
@@ -240,7 +240,7 @@ public void shouldAggregateAndMaterializeResults() {
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             processData(topic, driver);
-            final KeyValueStore<String, String> aggregate = 
(KeyValueStore<String, String>) driver.getStateStore("aggregate");
+            final KeyValueStore<String, String> aggregate = 
driver.getKeyValueStore("aggregate");
             assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
             assertThat(aggregate.get("2"), equalTo("0+2+2"));
         }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e46ec6a35d0..773cbb4c323 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -170,7 +171,7 @@
 @InterfaceStability.Evolving
 public class TopologyTestDriver implements Closeable {
 
-    private final Time mockTime;
+    private final Time mockWallClockTime;
     private final InternalTopologyBuilder internalTopologyBuilder;
 
     private final static int PARTITION_ID = 0;
@@ -179,6 +180,8 @@
     private final GlobalStateUpdateTask globalStateTask;
     private final GlobalStateManager globalStateManager;
 
+    private final InternalProcessorContext context;
+
     private final StateDirectory stateDirectory;
     private final Metrics metrics;
     private final ProcessorTopology processorTopology;
@@ -216,22 +219,9 @@ public TopologyTestDriver(final Topology topology,
     public TopologyTestDriver(final Topology topology,
                               final Properties config,
                               final long initialWallClockTimeMs) {
-
         this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
     }
 
-    /**
-     * Create a new test diver instance.
-     *
-     * @param builder builder for the topology to be tested
-     * @param config the configuration for the topology
-     */
-    TopologyTestDriver(final InternalTopologyBuilder builder,
-                       final Properties config) {
-        this(builder, config,  System.currentTimeMillis());
-
-    }
-
     /**
      * Create a new test diver instance.
      *
@@ -240,10 +230,10 @@ public TopologyTestDriver(final Topology topology,
      * @param initialWallClockTimeMs the initial value of internally mocked 
wall-clock time
      */
     private TopologyTestDriver(final InternalTopologyBuilder builder,
-                              final Properties config,
-                              final long initialWallClockTimeMs) {
+                               final Properties config,
+                               final long initialWallClockTimeMs) {
         final StreamsConfig streamsConfig = new StreamsConfig(config);
-        mockTime = new MockTime(initialWallClockTimeMs);
+        mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
         
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
@@ -260,7 +250,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
         };
 
         final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        stateDirectory = new StateDirectory(streamsConfig, mockTime);
+        stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
         metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
@@ -323,6 +313,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
                 new LogContext()
             );
             globalStateTask.initialize();
+            globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -342,12 +333,15 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
                 streamsMetrics,
                 stateDirectory,
                 cache,
-                mockTime,
+                mockWallClockTime,
                 producer);
             task.initializeStateStores();
             task.initializeTopology();
+            context = (InternalProcessorContext) task.context();
+            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
null, new RecordHeaders()));
         } else {
             task = null;
+            context = null;
         }
     }
 
@@ -356,6 +350,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
      *
      * @return Map of all metrics.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(metrics.metrics());
     }
@@ -390,13 +385,10 @@ public void pipeInput(final ConsumerRecord<byte[], 
byte[]> consumerRecord) {
                 consumerRecord.headers())));
 
             // Process the record ...
-            ((InternalProcessorContext) task.context()).setRecordContext(
-                    new ProcessorRecordContext(consumerRecord.timestamp(), 
offset, topicPartition.partition(), topicName, consumerRecord.headers()));
             task.process();
             task.maybePunctuateStreamTime();
             task.commit();
             captureOutputRecords();
-
         } else {
             final TopicPartition globalTopicPartition = 
globalPartitionsByTopic.get(topicName);
             if (globalTopicPartition == null) {
@@ -446,12 +438,7 @@ private void captureOutputRecords() {
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
         producer.clear();
         for (final ProducerRecord<byte[], byte[]> record : output) {
-            Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());
-            if (outputRecords == null) {
-                outputRecords = new LinkedList<>();
-                outputRecordsByTopic.put(record.topic(), outputRecords);
-            }
-            outputRecords.add(record);
+            outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new 
LinkedList<>()).add(record);
 
             // Forward back into the topology if the produced record is to an 
internal or a source topic ...
             final String outputTopicName = record.topic();
@@ -497,7 +484,7 @@ public void pipeInput(final List<ConsumerRecord<byte[], 
byte[]>> records) {
      */
     @SuppressWarnings("WeakerAccess")
     public void advanceWallClockTime(final long advanceMs) {
-        mockTime.sleep(advanceMs);
+        mockWallClockTime.sleep(advanceMs);
         if (task != null) {
             task.maybePunctuateSystemTime();
             task.commit();
@@ -549,6 +536,8 @@ public void advanceWallClockTime(final long advanceMs) {
      * <p>
      * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
      * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to 
check the store afterward.
+     * <p>
+     * Note, that {@code StateStore} might be {@code null} if a store is added 
but not connected to any processor.
      *
      * @return all stores my name
      * @see #getStateStore(String)
@@ -579,13 +568,24 @@ public void advanceWallClockTime(final long advanceMs) {
      * @see #getWindowStore(String)
      * @see #getSessionStore(String)
      */
+    @SuppressWarnings("WeakerAccess")
     public StateStore getStateStore(final String name) {
-        StateStore stateStore = task == null ? null :
-            ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);
-        if (stateStore == null && globalStateManager != null) {
-            stateStore = globalStateManager.getGlobalStore(name);
+        if (task != null) {
+            final StateStore stateStore = ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);
+            if (stateStore != null) {
+                return stateStore;
+            }
         }
-        return stateStore;
+
+        if (globalStateManager != null) {
+            final StateStore stateStore = 
globalStateManager.getGlobalStore(name);
+            if (stateStore != null) {
+                return stateStore;
+            }
+
+        }
+
+        return null;
     }
 
     /**
@@ -651,6 +651,7 @@ public StateStore getStateStore(final String name) {
     /**
      * Close the driver, its topology, and all processors.
      */
+    @SuppressWarnings("WeakerAccess")
     public void close() {
         if (task != null) {
             task.close(true, false);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index b14a7915dc9..cba02573b59 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -86,23 +86,27 @@ private CapturedPunctuator(final long intervalMs, final 
PunctuationType type, fi
             this.punctuator = punctuator;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public long getIntervalMs() {
             return intervalMs;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public PunctuationType getType() {
             return type;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public Punctuator getPunctuator() {
             return punctuator;
         }
 
-        @SuppressWarnings("WeakerAccess")
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public void cancel() {
             cancelled = true;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public boolean cancelled() {
             return cancelled;
         }
@@ -127,6 +131,7 @@ private CapturedForward(final To to, final KeyValue 
keyValue) {
          *
          * @return The child name, or {@code null} if it was broadcast.
          */
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public String childName() {
             return childName;
         }
@@ -136,6 +141,7 @@ public String childName() {
          *
          * @return A timestamp, or {@code -1} if none was forwarded.
          */
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public long timestamp() {
             return timestamp;
         }
@@ -145,6 +151,7 @@ public long timestamp() {
          *
          * @return A key/value pair. Not null.
          */
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public KeyValue keyValue() {
             return keyValue;
         }
@@ -158,6 +165,7 @@ public KeyValue keyValue() {
      * and most unit tests should be able to get by with the
      * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext() {
         //noinspection DoubleBraceInitialization
         this(
@@ -179,6 +187,7 @@ public MockProcessorContext() {
      *
      * @param config a Properties object, used to configure the context and 
the processor.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext(final Properties config) {
         this(config, new TaskId(0, 0), null);
     }
@@ -190,6 +199,7 @@ public MockProcessorContext(final Properties config) {
      * @param taskId   a {@link TaskId}, which the context makes available via 
{@link MockProcessorContext#taskId()}.
      * @param stateDir a {@link File}, which the context makes available viw 
{@link MockProcessorContext#stateDir()}.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext(final Properties config, final TaskId taskId, 
final File stateDir) {
         final StreamsConfig streamsConfig = new StreamsConfig(config);
         this.taskId = taskId;
@@ -252,6 +262,7 @@ public StreamsMetrics metrics() {
      * @param offset    A record offset
      * @param timestamp A record timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setRecordMetadata(final String topic, final int partition, 
final long offset, final Headers headers, final long timestamp) {
         this.topic = topic;
         this.partition = partition;
@@ -260,13 +271,13 @@ public void setRecordMetadata(final String topic, final 
int partition, final lon
         this.timestamp = timestamp;
     }
 
-
     /**
      * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
      * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
      *
      * @param topic A topic name
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setTopic(final String topic) {
         this.topic = topic;
     }
@@ -277,21 +288,29 @@ public void setTopic(final String topic) {
      *
      * @param partition A partition number
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setPartition(final int partition) {
         this.partition = partition;
     }
 
-
     /**
      * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
      * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
      *
      * @param offset A record offset
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setOffset(final long offset) {
         this.offset = offset;
     }
 
+    /**
+     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
+     *
+     * @param headers Record headers
+     */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setHeaders(final Headers headers) {
         this.headers = headers;
     }
@@ -302,6 +321,7 @@ public void setHeaders(final Headers headers) {
      *
      * @param timestamp A record timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setTimestamp(final long timestamp) {
         this.timestamp = timestamp;
     }
@@ -345,7 +365,6 @@ public long timestamp() {
 
     // mocks ================================================
 
-
     @Override
     public void register(final StateStore store,
                          final StateRestoreCallback 
stateRestoreCallbackIsIgnoredInMock) {
@@ -376,6 +395,7 @@ public void cancel() {
      *
      * @return A list of captured punctuators.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedPunctuator> scheduledPunctuators() {
         final LinkedList<CapturedPunctuator> capturedPunctuators = new 
LinkedList<>();
         capturedPunctuators.addAll(punctuators);
@@ -394,6 +414,7 @@ public void cancel() {
         capturedForwards.add(new CapturedForward(to, new KeyValue(key, 
value)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K, V> void forward(final K key, final V value, final int 
childIndex) {
         throw new UnsupportedOperationException(
@@ -402,6 +423,7 @@ public void cancel() {
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K, V> void forward(final K key, final V value, final String 
childName) {
         throw new UnsupportedOperationException(
@@ -417,6 +439,7 @@ public void cancel() {
      *
      * @return A list of key/value pairs that were previously passed to the 
context.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedForward> forwarded() {
         final LinkedList<CapturedForward> result = new LinkedList<>();
         result.addAll(capturedForwards);
@@ -431,6 +454,7 @@ public void cancel() {
      * @param childName The child name to retrieve forwards for
      * @return A list of key/value pairs that were previously passed to the 
context.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedForward> forwarded(final String childName) {
         final LinkedList<CapturedForward> result = new LinkedList<>();
         for (final CapturedForward capture : capturedForwards) {
@@ -444,6 +468,7 @@ public void cancel() {
     /**
      * Clear the captured forwarded data.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void resetForwards() {
         capturedForwards.clear();
     }
@@ -458,6 +483,7 @@ public void commit() {
      *
      * @return {@code true} iff {@link ProcessorContext#commit()} has been 
called in this context since construction or reset.
      */
+    @SuppressWarnings("WeakerAccess")
     public boolean committed() {
         return committed;
     }
@@ -465,6 +491,7 @@ public boolean committed() {
     /**
      * Reset the commit capture to {@code false} (whether or not it was 
previously {@code true}).
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void resetCommit() {
         committed = false;
     }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index 507249d0d2e..108dafdfdba 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -44,7 +44,7 @@
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
     private long timeMs;
-    private long advanceMs;
+    private final long advanceMs;
 
     /**
      * Create a new factory for the given topic.
@@ -54,6 +54,7 @@
      * @param keySerializer the key serializer
      * @param valueSerializer the value serializer
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer) {
         this(null, keySerializer, valueSerializer, System.currentTimeMillis());
@@ -68,6 +69,7 @@ public ConsumerRecordFactory(final Serializer<K> 
keySerializer,
      * @param keySerializer the key serializer
      * @param valueSerializer the value serializer
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final String defaultTopicName,
                                  final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer) {
@@ -82,6 +84,7 @@ public ConsumerRecordFactory(final String defaultTopicName,
      * @param valueSerializer the value serializer
      * @param startTimestampMs the initial timestamp for generated records
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
                                  final long startTimestampMs) {
@@ -97,6 +100,7 @@ public ConsumerRecordFactory(final Serializer<K> 
keySerializer,
      * @param valueSerializer the value serializer
      * @param startTimestampMs the initial timestamp for generated records
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final String defaultTopicName,
                                  final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
@@ -112,6 +116,7 @@ public ConsumerRecordFactory(final String defaultTopicName,
      * @param startTimestampMs the initial timestamp for generated records
      * @param autoAdvanceMs the time increment pre generated record
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
                                  final long startTimestampMs,
@@ -128,6 +133,7 @@ public ConsumerRecordFactory(final Serializer<K> 
keySerializer,
      * @param startTimestampMs the initial timestamp for generated records
      * @param autoAdvanceMs the time increment pre generated record
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final String defaultTopicName,
                                  final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
@@ -147,6 +153,7 @@ public ConsumerRecordFactory(final String defaultTopicName,
      *
      * @param advanceMs the amount of time to advance
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void advanceTimeMs(final long advanceMs) {
         if (advanceMs < 0) {
             throw new IllegalArgumentException("advanceMs must be positive");
@@ -165,6 +172,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
@@ -198,6 +206,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
@@ -214,6 +223,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final long timestampMs) {
@@ -230,6 +240,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final Headers headers,
@@ -250,6 +261,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value) {
@@ -268,6 +280,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
@@ -285,6 +298,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value) {
         return create(key, value, new RecordHeaders());
@@ -299,6 +313,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final Headers headers) {
@@ -318,6 +333,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
                                                  final long timestampMs) {
@@ -334,6 +350,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
                                                  final Headers headers,
@@ -349,6 +366,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value,
                                                  final long timestampMs) {
         return create(value, new RecordHeaders(), timestampMs);
@@ -363,6 +381,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value,
                                                  final Headers headers,
                                                  final long timestampMs) {
@@ -382,6 +401,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
                                                  final Headers headers) {
@@ -396,6 +416,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value) {
         return create(topicName, null, value, new RecordHeaders());
@@ -408,6 +429,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value) {
         return create(value, new RecordHeaders());
     }
@@ -420,6 +442,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value,
                                                  final Headers headers) {
         if (topicName == null) {
@@ -437,6 +460,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param keyValues the record keys and values
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
                                                        final List<KeyValue<K, 
V>> keyValues) {
         final List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>(keyValues.size());
@@ -455,6 +479,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param keyValues the record keys and values
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, 
V>> keyValues) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created 
without defaultTopicName. " +
@@ -474,6 +499,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param advanceMs the time difference between two consecutive generated 
records
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
                                                        final List<KeyValue<K, 
V>> keyValues,
                                                        final long 
startTimestamp,
@@ -502,6 +528,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param advanceMs the time difference between two consecutive generated 
records
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, 
V>> keyValues,
                                                        final long 
startTimestamp,
                                                        final long advanceMs) {
@@ -523,6 +550,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param startTimestamp the timestamp for the first generated record
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
                                                        final List<KeyValue<K, 
V>> keyValues,
                                                        final long 
startTimestamp) {
@@ -538,6 +566,7 @@ public void advanceTimeMs(final long advanceMs) {
      * @param startTimestamp the timestamp for the first generated record
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, 
V>> keyValues,
                                                        final long 
startTimestamp) {
         if (topicName == null) {
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
index aedb910e28c..f78e926e431 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -39,6 +39,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value is not equal 
to {@code expectedValue}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValue(final ProducerRecord<K, V> record,
                                            final V expectedValue) throws 
AssertionError {
         Objects.requireNonNull(record);
@@ -65,6 +66,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value is not equal 
to {@code expectedRecord}'s value
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValue(final ProducerRecord<K, V> record,
                                            final ProducerRecord<K, V> 
expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -82,6 +84,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s key or value is not 
equal to {@code expectedKey} or {@code expectedValue}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValue(final ProducerRecord<K, V> 
record,
                                               final K expectedKey,
                                               final V expectedValue) throws 
AssertionError {
@@ -119,6 +122,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s key or value is not 
equal to {@code expectedRecord}'s key or value
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValue(final ProducerRecord<K, V> 
record,
                                               final ProducerRecord<K, V> 
expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -136,6 +140,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or timestamp 
is not equal to {@code expectedValue} or {@code expectedTimestamp}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> 
record,
                                                     final V expectedValue,
                                                     final long 
expectedTimestamp) throws AssertionError {
@@ -169,6 +174,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or timestamp 
is not equal to {@code expectedRecord}'s value or timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> 
record,
                                                     final ProducerRecord<K, V> 
expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -189,6 +195,7 @@
      * @throws AssertionError if {@code ProducerRecord}'s key, value, 
timestamp is not equal to {@code expectedKey},
      * {@code expectedValue}, or {@code expectedTimestamps}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, 
V> record,
                                                        final K expectedKey,
                                                        final V expectedValue,
@@ -233,6 +240,7 @@
      * @throws AssertionError if {@code ProducerRecord}'s key, value, or 
timestamp is not equal to
      * {@code expectedRecord}'s key, value, or timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, 
V> record,
                                                        final ProducerRecord<K, 
V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -250,6 +258,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or headers is 
not equal to {@code expectedValue} or {@code expectedHeaders}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> 
record,
                                                   final V expectedValue,
                                                   final Headers 
expectedHeaders) throws AssertionError {
@@ -287,6 +296,7 @@
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or headers is 
not equal to {@code expectedRecord}'s value or headers
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> 
record,
                                                   final ProducerRecord<K, V> 
expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -307,6 +317,7 @@
      * @throws AssertionError if {@code ProducerRecord}'s key, value, headers 
is not equal to {@code expectedKey},
      *                        {@code expectedValue}, or {@code expectedHeaders}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, 
V> record,
                                                      final K expectedKey,
                                                      final V expectedValue,
@@ -355,6 +366,7 @@
      * @throws AssertionError if {@code ProducerRecord}'s key, value, or 
headers is not equal to
      *                        {@code expectedRecord}'s key, value, or headers
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, 
V> record,
                                                      final ProducerRecord<K, 
V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -376,6 +388,7 @@
      * @throws AssertionError if {@code ProducerRecord}'s key, value, headers 
is not equal to {@code expectedKey},
      *                        {@code expectedValue}, or {@code expectedHeaders}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeadersTimestamp(final 
ProducerRecord<K, V> record,
                                                               final K 
expectedKey,
                                                               final V 
expectedValue,
@@ -432,6 +445,7 @@
      * @throws AssertionError if {@code ProducerRecord}'s key, value, headers, 
or timestamp is not equal to
      *                        {@code expectedRecord}'s key, value, headers, or 
timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeadersTimestamp(final 
ProducerRecord<K, V> record,
                                                               final 
ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 64d5b12dc0b..878aa357483 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -156,9 +156,9 @@ public void process(final String key, final Long value) {
     @Test
     public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
         final AbstractProcessor<String, Long> processor = new 
AbstractProcessor<String, Long>() {
+            @SuppressWarnings("deprecation")
             @Override
             public void process(final String key, final Long value) {
-                //noinspection deprecation
                 context().forward(key, value, 0);
             }
         };
@@ -178,9 +178,9 @@ public void process(final String key, final Long value) {
     @Test
     public void shouldThrowIfForwardedWithDeprecatedChildName() {
         final AbstractProcessor<String, Long> processor = new 
AbstractProcessor<String, Long>() {
+            @SuppressWarnings("deprecation")
             @Override
             public void process(final String key, final Long value) {
-                //noinspection deprecation
                 context().forward(key, value, "child1");
             }
         };
@@ -347,12 +347,7 @@ public void init(final ProcessorContext context) {
                 context.schedule(
                     1000L,
                     PunctuationType.WALL_CLOCK_TIME,
-                    new Punctuator() {
-                        @Override
-                        public void punctuate(final long timestamp) {
-                            context.commit();
-                        }
-                    }
+                    timestamp -> context.commit()
                 );
             }
 
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2d446d1de2c..7552637dc26 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -31,14 +31,15 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -64,6 +65,7 @@
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -100,26 +102,27 @@
     };
     private KeyValueStore<String, Long> store;
 
-    private StringDeserializer stringDeserializer = new StringDeserializer();
-    private LongDeserializer longDeserializer = new LongDeserializer();
-    private ConsumerRecordFactory<String, Long> recordFactory = new 
ConsumerRecordFactory<>(
+    private final StringDeserializer stringDeserializer = new 
StringDeserializer();
+    private final LongDeserializer longDeserializer = new LongDeserializer();
+    private final ConsumerRecordFactory<String, Long> recordFactory = new 
ConsumerRecordFactory<>(
         new StringSerializer(),
         new LongSerializer());
 
 
     private final static class Record {
-        private Object key;
-        private Object value;
-        private long timestamp;
-        private long offset;
-        private String topic;
-        private Headers headers;
-
-        Record(final ConsumerRecord consumerRecord) {
+        private final Object key;
+        private final Object value;
+        private final long timestamp;
+        private final long offset;
+        private final String topic;
+        private final Headers headers;
+
+        Record(final ConsumerRecord consumerRecord,
+               final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
             timestamp = consumerRecord.timestamp();
-            offset = consumerRecord.offset();
+            offset = newOffset;
             topic = consumerRecord.topic();
             headers = consumerRecord.headers();
         }
@@ -184,7 +187,7 @@ public int hashCode() {
         private final List<Long> punctuatedAt = new LinkedList<>();
 
         @Override
-        public void punctuate(long timestamp) {
+        public void punctuate(final long timestamp) {
             punctuatedAt.add(timestamp);
         }
     }
@@ -202,7 +205,7 @@ public void punctuate(long timestamp) {
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             initialized = true;
             this.context = context;
             for (final Punctuation punctuation : punctuations) {
@@ -211,7 +214,7 @@ public void init(ProcessorContext context) {
         }
 
         @Override
-        public void process(Object key, Object value) {
+        public void process(final Object key, final Object value) {
             processedRecords.add(new Record(key, value, context.headers(), 
context.timestamp(), context.offset(), context.topic()));
             context.forward(key, value);
         }
@@ -228,7 +231,7 @@ public void close() {
         private final Collection<Punctuation> punctuations;
 
         private MockProcessorSupplier() {
-            this(Collections.<Punctuation>emptySet());
+            this(Collections.emptySet());
         }
 
         private MockProcessorSupplier(final Collection<Punctuation> 
punctuations) {
@@ -391,8 +394,7 @@ public void shouldSetRecordMetadata() {
         assertEquals(1, processedRecords.size());
 
         final Record record = processedRecords.get(0);
-        final Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        final Record expectedResult = new Record(consumerRecord1, 0L);
 
         assertThat(record, equalTo(expectedResult));
     }
@@ -410,8 +412,7 @@ public void shouldSendRecordViaCorrectSourceTopic() {
         assertEquals(0, processedRecords2.size());
 
         Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        Record expectedResult = new Record(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         testDriver.pipeInput(consumerRecord2);
@@ -420,8 +421,7 @@ public void shouldSendRecordViaCorrectSourceTopic() {
         assertEquals(1, processedRecords2.size());
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2);
-        expectedResult.offset = 0L;
+        expectedResult = new Record(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -439,7 +439,7 @@ public void shouldUseSourceSpecificDeserializers() {
         topology.addSink(
             "sink",
             SINK_TOPIC_1,
-            new Serializer() {
+            new Serializer<Object>() {
                 @Override
                 public byte[] serialize(final String topic, final Object data) 
{
                     if (data instanceof Long) {
@@ -452,7 +452,7 @@ public void close() {}
                 @Override
                 public void configure(final Map configs, final boolean isKey) 
{}
             },
-            new Serializer() {
+            new Serializer<Object>() {
                 @Override
                 public byte[] serialize(final String topic, final Object data) 
{
                     if (data instanceof String) {
@@ -560,13 +560,11 @@ public void shouldProcessConsumerRecordList() {
         assertEquals(1, processedRecords2.size());
 
         Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        Record expectedResult = new Record(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2);
-        expectedResult.offset = 0L;
+        expectedResult = new Record(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -601,8 +599,7 @@ public void shouldPopulateGlobalStore() {
         assertEquals(1, processedRecords.size());
 
         final Record record = processedRecords.get(0);
-        final Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        final Record expectedResult = new Record(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -687,13 +684,14 @@ public void shouldPunctuateOnWallClockTime() {
     @Test
     public void shouldReturnAllStores() {
         final Topology topology = setupSourceSinkTopology();
+        topology.addProcessor("processor", () -> null);
         topology.addStateStore(
             new KeyValueStoreBuilder<>(
                 Stores.inMemoryKeyValueStore("store"),
                 Serdes.ByteArray(),
                 Serdes.ByteArray(),
-                new SystemTime())
-                .withLoggingDisabled());
+                new SystemTime()),
+            "processor");
         topology.addGlobalStore(
             new KeyValueStoreBuilder<>(
                 Stores.inMemoryKeyValueStore("globalStore"),
@@ -705,12 +703,41 @@ public void shouldReturnAllStores() {
             Serdes.ByteArray().deserializer(),
             "globalTopicName",
             "globalProcessorName",
-            new ProcessorSupplier() {
-                @Override
-                public Processor get() {
-                    return null;
-                }
-            });
+            () -> null);
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        final Set<String> expectedStoreNames = new HashSet<>();
+        expectedStoreNames.add("store");
+        expectedStoreNames.add("globalStore");
+        final Map<String, StateStore> allStores = 
testDriver.getAllStateStores();
+        assertThat(allStores.keySet(), equalTo(expectedStoreNames));
+        for (final StateStore store : allStores.values()) {
+            assertNotNull(store);
+        }
+    }
+
+    @Test
+    public void shouldReturnAllStoresNames() {
+        final Topology topology = setupSourceSinkTopology();
+        topology.addStateStore(
+            new KeyValueStoreBuilder<>(
+                Stores.inMemoryKeyValueStore("store"),
+                Serdes.ByteArray(),
+                Serdes.ByteArray(),
+                new SystemTime()));
+        topology.addGlobalStore(
+            new KeyValueStoreBuilder<>(
+                Stores.inMemoryKeyValueStore("globalStore"),
+                Serdes.ByteArray(),
+                Serdes.ByteArray(),
+                new SystemTime()).withLoggingDisabled(),
+            "sourceProcessorName",
+            Serdes.ByteArray().deserializer(),
+            Serdes.ByteArray().deserializer(),
+            "globalTopicName",
+            "globalProcessorName",
+            () -> null);
 
         testDriver = new TopologyTestDriver(topology, config);
 
@@ -721,13 +748,13 @@ public Processor get() {
     }
 
     private void setup() {
-        Topology topology = new Topology();
+        final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");
         topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
         topology.addStateStore(Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("aggStore"),
             Serdes.String(),
-            Serdes.Long()).withLoggingDisabled(), // need to disable logging 
to allow store pre-populating
+            Serdes.Long()),
             "aggregator");
         topology.addSink("sinkProcessor", "result-topic", "aggregator");
 
@@ -812,18 +839,8 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         @Override
         public void init(final ProcessorContext context) {
             this.context = context;
-            context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new 
Punctuator() {
-                @Override
-                public void punctuate(final long timestamp) {
-                    flushStore();
-                }
-            });
-            context.schedule(10000, PunctuationType.STREAM_TIME, new 
Punctuator() {
-                @Override
-                public void punctuate(final long timestamp) {
-                    flushStore();
-                }
-            });
+            context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, timestamp 
-> flushStore());
+            context.schedule(10000, PunctuationType.STREAM_TIME, timestamp -> 
flushStore());
             store = (KeyValueStore<String, Long>) 
context.getStateStore("aggStore");
         }
 
@@ -908,7 +925,7 @@ public void shouldFeedStoreFromGlobalKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.globalTable("topic",
             Consumed.with(Serdes.String(), Serdes.String()),
-            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("globalStore"));
+            Materialized.as("globalStore"));
         try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build(), config)) {
             final KeyValueStore<String, String> globalStore = 
testDriver.getKeyValueStore("globalStore");
             Assert.assertNotNull(globalStore);
@@ -956,8 +973,7 @@ public void 
shouldProcessFromSourcesThatMatchMultiplePattern() {
         assertEquals(0, processedRecords2.size());
 
         final Record record1 = processedRecords1.get(0);
-        final Record expectedResult1 = new Record(consumerRecord1);
-        expectedResult1.offset = 0L;
+        final Record expectedResult1 = new Record(consumerRecord1, 0L);
         assertThat(record1, equalTo(expectedResult1));
 
         testDriver.pipeInput(consumerRecord2);
@@ -966,8 +982,7 @@ public void 
shouldProcessFromSourcesThatMatchMultiplePattern() {
         assertEquals(1, processedRecords2.size());
 
         final Record record2 = processedRecords2.get(0);
-        final Record expectedResult2 = new Record(consumerRecord2);
-        expectedResult2.offset = 0L;
+        final Record expectedResult2 = new Record(consumerRecord2, 0L);
         assertThat(record2, equalTo(expectedResult2));
     }
 
@@ -1004,7 +1019,7 @@ public void 
shouldThrowPatternNotValidForTopicNameException() {
         try {
             testDriver.pipeInput(consumerRecord1);
         } catch (final TopologyException exception) {
-            String str =
+            final String str =
                     String.format(
                             "Invalid topology: Topology add source of type 
String for topic: %s cannot contain regex pattern for " +
                                     "input record topic: %s and hence cannot 
process the message.",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> TopologyTestDriver does not allow pre-populating state stores that have 
> change logging
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6967
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6967
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>             Fix For: 2.0.0
>
>
> TopologyTestDriver does not allow pre-populating a state store that has 
> logging enabled. If you try to do it, you will get the following error 
> message:
>  
> {code:java}
> java.lang.IllegalStateException: This should not happen as timestamp() should 
> only be called while a record is processed
>       at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
>       at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> {code}
> Also see:
> https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java#L723-L740



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to