This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new c6a9abe KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923) c6a9abe is described below commit c6a9abe3fad0cc92314ad272d9f7a2ee792bc2c8 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Nov 20 14:39:12 2018 -0800 KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923) In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn on caching to verify this case. Reviewers: Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../processor/internals/AbstractProcessorContext.java | 2 +- .../org/apache/kafka/streams/TopologyTestDriver.java | 4 ++-- .../apache/kafka/streams/TopologyTestDriverTest.java | 17 +++++++++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 0c3fcf2..62e0936 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -33,7 +33,7 @@ import java.util.Objects; public abstract class AbstractProcessorContext implements InternalProcessorContext { - static final String NONEXIST_TOPIC = "__null_topic__"; + public static final String NONEXIST_TOPIC = "__null_topic__"; private final TaskId taskId; private final String applicationId; private final StreamsConfig config; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 245a6fa..96904c2 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -316,7 +316,7 @@ public class TopologyTestDriver implements Closeable { new LogContext() ); globalStateTask.initialize(); - globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); + globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders())); } else { globalStateManager = null; globalStateTask = null; @@ -342,7 +342,7 @@ public class TopologyTestDriver implements Closeable { task.initializeStateStores(); task.initializeTopology(); context = (InternalProcessorContext) task.context(); - context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); + context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders())); } else { task = null; context = null; diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index d0d4ed1..8400e87 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -888,6 +888,23 @@ public class TopologyTestDriverTest { } @Test + public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() { + final Topology topology = new Topology(); + topology.addSource("sourceProcessor", "input-topic"); + topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor"); + topology.addStateStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("aggStore"), + Serdes.String(), + Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve better test coverage + "aggregator"); + + testDriver = new TopologyTestDriver(topology, config); + + store = testDriver.getKeyValueStore("aggStore"); + store.put("a", 21L); + } + + @Test public void shouldCleanUpPersistentStateStoresOnClose() { final Topology topology = new Topology(); topology.addSource("sourceProcessor", "input-topic");