This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new c6a9abe  KAFKA-7536: Initialize TopologyTestDriver with non-null topic 
(#5923)
c6a9abe is described below

commit c6a9abe3fad0cc92314ad272d9f7a2ee792bc2c8
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Nov 20 14:39:12 2018 -0800

    KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
    
    In TopologyTestDriver constructor set non-null topic; and in unit test 
intentionally turn on caching to verify this case.
    
    Reviewers: Bill Bejeck <b...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../processor/internals/AbstractProcessorContext.java   |  2 +-
 .../org/apache/kafka/streams/TopologyTestDriver.java    |  4 ++--
 .../apache/kafka/streams/TopologyTestDriverTest.java    | 17 +++++++++++++++++
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 0c3fcf2..62e0936 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -33,7 +33,7 @@ import java.util.Objects;
 
 public abstract class AbstractProcessorContext implements 
InternalProcessorContext {
 
-    static final String NONEXIST_TOPIC = "__null_topic__";
+    public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 245a6fa..96904c2 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -316,7 +316,7 @@ public class TopologyTestDriver implements Closeable {
                 new LogContext()
             );
             globalStateTask.initialize();
-            globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+            globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new 
RecordHeaders()));
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -342,7 +342,7 @@ public class TopologyTestDriver implements Closeable {
             task.initializeStateStores();
             task.initializeTopology();
             context = (InternalProcessorContext) task.context();
-            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
null, new RecordHeaders()));
+            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
         } else {
             task = null;
             context = null;
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d0d4ed1..8400e87 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -888,6 +888,23 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
+        final Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
+        topology.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withCachingEnabled(), // intentionally turn on 
caching to achieve better test coverage
+            "aggregator");
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        store = testDriver.getKeyValueStore("aggStore");
+        store.put("a", 21L);
+    }
+
+    @Test
     public void shouldCleanUpPersistentStateStoresOnClose() {
         final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");

Reply via email to