This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 2dd3713b2a1 KAFKA-14172: Should clear cache when active recycled from
standby (#13369)
2dd3713b2a1 is described below
commit 2dd3713b2a183bb904651c0022c5d953970b4ad3
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed Apr 5 16:05:11 2023 -0700
KAFKA-14172: Should clear cache when active recycled from standby (#13369)
This fix is inspired by #12540.
1. Added a clearCache function for CachedStateStore, which would be
triggered upon recycling a state manager.
2. Added the integration test inherited from #12540 .
3. Improved some log4j entries.
4. Found and fixed a minor issue with log4j prefix.
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../processor/internals/ActiveTaskCreator.java | 4 +-
.../processor/internals/ProcessorStateManager.java | 20 +-
.../processor/internals/StandbyTaskCreator.java | 2 +-
.../processor/internals/StoreChangelogReader.java | 2 +-
.../streams/state/internals/CachedStateStore.java | 10 +
.../state/internals/CachingKeyValueStore.java | 12 +
.../state/internals/CachingSessionStore.java | 5 +
.../state/internals/CachingWindowStore.java | 5 +
.../kafka/streams/state/internals/NamedCache.java | 7 +
.../kafka/streams/state/internals/ThreadCache.java | 8 +
.../internals/TimeOrderedCachingWindowStore.java | 5 +
.../streams/state/internals/WrappedStateStore.java | 8 +
...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 +++++++++++++++++++++
.../internals/ProcessorStateManagerTest.java | 29 ++
14 files changed, 411 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 581175e2c97..7f423c84069 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -226,7 +226,7 @@ class ActiveTaskCreator {
}
standbyTask.prepareRecycle();
- standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE);
+ standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE,
getLogContext(standbyTask.id));
final RecordCollector recordCollector =
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id),
standbyTask.topology);
final StreamTask task = new StreamTask(
@@ -324,7 +324,7 @@ class ActiveTaskCreator {
private LogContext getLogContext(final TaskId taskId) {
final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
- final String logPrefix = threadIdPrefix + String.format("%s [%s] ",
"task", taskId);
+ final String logPrefix = threadIdPrefix + String.format("%s [%s] ",
"stream-task", taskId);
return new LogContext(logPrefix);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 6a66e428ea5..746bb02db29 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -588,14 +588,30 @@ public class ProcessorStateManager implements
StateManager {
final List<TopicPartition> allChangelogs =
getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
+
+ // when the state manager is recycled to be used, future writes may
bypass its store's caching
+ // layer if they are from restoration, hence we need to clear the
state store's caches just in case
+ // See KAFKA-14172 for details
+ if (!stores.isEmpty()) {
+ log.debug("Clearing all store caches registered in the state
manager: {}", stores);
+ for (final StateStoreMetadata metadata : stores.values()) {
+ final StateStore store = metadata.stateStore;
+
+ if (store instanceof CachedStateStore) {
+ ((CachedStateStore) store).clearCache();
+ }
+ log.trace("Cleared cache {}", store.name());
+ }
+ }
}
- void transitionTaskType(final TaskType newType) {
+ void transitionTaskType(final TaskType newType, final LogContext
logContext) {
if (taskType.equals(newType)) {
throw new IllegalStateException("Tried to recycle state for task
type conversion but new type was the same.");
}
taskType = newType;
+ log = logContext.logger(ProcessorStateManager.class);
}
@Override
@@ -639,7 +655,7 @@ public class ProcessorStateManager implements StateManager {
}
}
- log.debug("Writing checkpoint: {}", checkpointingOffsets);
+ log.debug("Writing checkpoint: {} for task {}", checkpointingOffsets,
taskId);
try {
checkpointFile.write(checkpointingOffsets);
} catch (final IOException e) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 46ad4c8909a..0f11d41e18d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -123,7 +123,7 @@ class StandbyTaskCreator {
}
streamTask.prepareRecycle();
- streamTask.stateMgr.transitionTaskType(Task.TaskType.STANDBY);
+ streamTask.stateMgr.transitionTaskType(Task.TaskType.STANDBY,
getLogContext(streamTask.id));
final StandbyTask task = new StandbyTask(
streamTask.id,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 874f1993c19..30c3d3121c1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -834,7 +834,7 @@ public class StoreChangelogReader implements
ChangelogReader {
changelogMetadata.restoreEndOffset = Math.min(endOffset,
committedOffset);
- log.debug("End offset for changelog {} initialized as {}.",
partition, changelogMetadata.restoreEndOffset);
+ log.info("End offset for changelog {} initialized as {}.",
partition, changelogMetadata.restoreEndOffset);
} else {
if (!newPartitionsToRestore.remove(changelogMetadata)) {
throw new IllegalStateException("New changelogs to restore
" + newPartitionsToRestore +
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
index 37758e17ba6..9bfaa5335cb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
@@ -32,4 +32,14 @@ public interface CachedStateStore<K, V> {
* TODO: this is a hacky workaround for now, should be removed when we
decouple caching with emitting
*/
void flushCache();
+
+ /**
+ * Clear the cache; this is used if the underlying store could be updated
directly
+ * and hence making the cache out of date.
+ * Please note this call does not try to flush the cache, instead if
assumes the cache
+ * itself has been flushed completely
+ *
+ * TODO: this is a hacky workaround for now, should be removed when we
decouple caching with emitting
+ */
+ void clearCache();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 04f2a0c6230..8ba3052a9f4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -482,6 +482,18 @@ public class CachingKeyValueStore
}
}
+ @Override
+ public void clearCache() {
+ validateStoreOpen();
+ lock.writeLock().lock();
+ try {
+ validateStoreOpen();
+ context.cache().clear(cacheName);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
@Override
public void close() {
lock.writeLock().lock();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index cff10da5f87..b8d2bf738b9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -334,6 +334,11 @@ class CachingSessionStore
context.cache().flush(cacheName);
}
+ @Override
+ public void clearCache() {
+ context.cache().clear(cacheName);
+ }
+
public void close() {
final LinkedList<RuntimeException> suppressed = executeAll(
() -> context.cache().flush(cacheName),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 5477e57c713..40001fc39da 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -425,6 +425,11 @@ class CachingWindowStore
context.cache().flush(cacheName);
}
+ @Override
+ public void clearCache() {
+ context.cache().clear(cacheName);
+ }
+
@Override
public synchronized void close() {
final LinkedList<RuntimeException> suppressed = executeAll(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 8b8365ea351..c4119906dd7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -361,6 +361,13 @@ class NamedCache {
streamsMetrics.removeAllCacheLevelSensors(Thread.currentThread().getName(),
taskName, storeName);
}
+ synchronized void clear() {
+ head = tail = null;
+ currentSizeBytes = 0;
+ dirtyKeys.clear();
+ cache.clear();
+ }
+
/**
* A simple wrapper class to implement a doubly-linked list around
MemoryLRUCacheBytesEntry
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 60a934e6b46..66cf008af92 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -280,6 +280,14 @@ public class ThreadCache {
}
}
+ synchronized void clear(final String namespace) {
+ final NamedCache cleared = caches.get(namespace);
+ if (cleared != null) {
+ sizeInBytes.getAndAdd(-cleared.sizeInBytes());
+ cleared.clear();
+ }
+ }
+
private void maybeEvict(final String namespace, final NamedCache cache) {
int numEvicted = 0;
while (sizeInBytes.get() > maxCacheSizeBytes) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 62cca5e459c..9c5e13684bc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -520,6 +520,11 @@ class TimeOrderedCachingWindowStore
context.cache().flush(cacheName);
}
+ @Override
+ public void clearCache() {
+ context.cache().clear(cacheName);
+ }
+
@Override
public synchronized void close() {
final LinkedList<RuntimeException> suppressed = executeAll(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index b3e0c767eb5..ff80733c407 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -77,6 +77,14 @@ public abstract class WrappedStateStore<S extends
StateStore, K, V> implements S
}
}
+ @Override
+ public void clearCache() {
+ if (wrapped instanceof CachedStateStore) {
+ ((CachedStateStore) wrapped).clearCache();
+ }
+ }
+
+
@Override
public String name() {
return wrapped.name();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
new file mode 100644
index 00000000000..c29475579bd
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@Category(IntegrationTest.class)
+public class StandbyTaskEOSMultiRebalanceIntegrationTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StandbyTaskEOSMultiRebalanceIntegrationTest.class);
+
+ private final static long TWO_MINUTE_TIMEOUT =
Duration.ofMinutes(2L).toMillis();
+
+ private String appId;
+ private String inputTopic;
+ private String storeName;
+ private String counterName;
+
+ private String outputTopic;
+
+ private KafkaStreams streamInstanceOne;
+ private KafkaStreams streamInstanceTwo;
+ private KafkaStreams streamInstanceThree;
+
+ private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @Before
+ public void createTopics() throws Exception {
+ final String safeTestName = UUID.randomUUID().toString();
+ appId = "app-" + safeTestName;
+ inputTopic = "input-" + safeTestName;
+ outputTopic = "output-" + safeTestName;
+ storeName = "store-" + safeTestName;
+ counterName = "counter-" + safeTestName;
+
+ CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic);
+ CLUSTER.createTopic(inputTopic, partitionCount, 3);
+ CLUSTER.createTopic(outputTopic, partitionCount, 3);
+ }
+
+ private final int partitionCount = 12;
+
+ @After
+ public void cleanUp() {
+ if (streamInstanceOne != null) {
+ streamInstanceOne.close();
+ }
+ if (streamInstanceTwo != null) {
+ streamInstanceTwo.close();
+ }
+ if (streamInstanceThree != null) {
+ streamInstanceThree.close();
+ }
+ }
+
+ // The test produces a duplicate fee range of integers from 0 (inclusive)
to initialBulk + secondBulk (exclusive) as input to the stream.
+ // The stream is responsible for assigning a unique id to each of the
integers.
+ // The output topic must thus contain 63000 message:
+ // The Key is unique and from the range of input values
+ // The Values produced are unique.
+ @Test
+ public void shouldHonorEOSWhenUsingCachingAndStandbyReplicas() throws
Exception {
+ final Properties readCommitted = new Properties();
+ readCommitted.setProperty("isolation.level", "read_committed");
+ final long time = System.currentTimeMillis();
+ final String base = TestUtils.tempDirectory(appId).getPath();
+
+ final int initialBulk = 3000;
+ final int secondBulk = 60000;
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ IntStream.range(0, initialBulk).boxed().map(i -> new
KeyValue<>(i, i)).collect(Collectors.toList()),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ IntegerSerializer.class,
+ new Properties()
+ ),
+ 10L + time
+ );
+
+ streamInstanceOne = buildWithUniqueIdAssignmentTopology(base + "-1");
+ streamInstanceTwo = buildWithUniqueIdAssignmentTopology(base + "-2");
+ streamInstanceThree = buildWithUniqueIdAssignmentTopology(base + "-3");
+
+ LOG.info("start first instance and wait for completed processing");
+
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne),
Duration.ofSeconds(30));
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ UUID.randomUUID().toString(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class,
+ readCommitted
+ ),
+ outputTopic,
+ initialBulk
+ );
+ LOG.info("Finished reading the initial bulk");
+
+ LOG.info("start second instance and wait for standby replication");
+
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo),
Duration.ofSeconds(30));
+ waitForCondition(
+ () -> streamInstanceTwo.store(
+ StoreQueryParameters.fromNameAndType(
+ storeName,
+ QueryableStoreTypes.<Integer,
Integer>keyValueStore()
+ ).enableStaleStores()
+ ).get(0) != null,
+ TWO_MINUTE_TIMEOUT,
+ "Could not get key from standby store"
+ );
+ LOG.info("Second stream have some data in the state store");
+
+
+ LOG.info("Produce the second bulk");
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ IntStream.range(initialBulk, initialBulk +
secondBulk).boxed().map(i -> new KeyValue<>(i, i)).collect(Collectors.toList()),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ IntegerSerializer.class,
+ new Properties()
+ ),
+ 1000L + time
+ );
+
+ LOG.info("Start stream three which will introduce a re-balancing event
and hopefully some redistribution of tasks.");
+
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceThree),
Duration.ofSeconds(90));
+
+ LOG.info("Wait for the processing to be completed");
+ final List<ConsumerRecord<Integer, Integer>> outputRecords =
IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ UUID.randomUUID().toString(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class,
+ readCommitted
+ ),
+ outputTopic,
+ initialBulk + secondBulk,
+ Duration.ofMinutes(10L).toMillis()
+ );
+ LOG.info("Processing completed");
+
+
outputRecords.stream().collect(Collectors.groupingBy(ConsumerRecord::value)).forEach(this::logIfDuplicate);
+
+ assertThat("Each output should correspond to one distinct value",
outputRecords.stream().map(ConsumerRecord::value).distinct().count(),
is(Matchers.equalTo((long) outputRecords.size())));
+ }
+
+ private void logIfDuplicate(final Integer id, final
List<ConsumerRecord<Integer, Integer>> record) {
+ assertThat("The id and the value in the records must match",
record.stream().allMatch(r -> id.equals(r.value())));
+ if (record.size() > 1) {
+ LOG.warn("Id : " + id + " is assigned to the following " +
record.stream().map(ConsumerRecord::key).collect(Collectors.toList()));
+ }
+ }
+
+ private KafkaStreams buildWithUniqueIdAssignmentTopology(final String
stateDirPath) {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.addStateStore(Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(storeName),
+ Serdes.Integer(),
+ Serdes.Integer())
+ );
+ builder.addStateStore(Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(counterName),
+ Serdes.Integer(),
+ Serdes.Integer()).withCachingEnabled()
+ );
+ builder.<Integer, Integer>stream(inputTopic)
+ .process(
+ () -> new Processor<Integer, Integer, Integer,
Integer>() {
+ private KeyValueStore<Integer, Integer> store;
+ private KeyValueStore<Integer, Integer> counter;
+ private ProcessorContext<Integer, Integer> context;
+
+ @Override
+ public void init(final ProcessorContext<Integer,
Integer> context) {
+ this.context = context;
+ store = context.getStateStore(storeName);
+ counter = context.getStateStore(counterName);
+ }
+
+ @Override
+ public void process(final Record<Integer, Integer>
record) {
+ final Integer key = record.key();
+ final Integer unused = record.value();
+ assertThat("Key and value mus be equal",
key.equals(unused));
+ Integer id = store.get(key);
+ // Only assign a new id if the value have not
been observed before
+ if (id == null) {
+ final int counterKey = 0;
+ final Integer lastCounter =
counter.get(counterKey);
+ final int newCounter = lastCounter == null
? 0 : lastCounter + 1;
+ counter.put(counterKey, newCounter);
+ // Partitions assign ids from their own id
space
+ id = newCounter * partitionCount +
context.recordMetadata().get().partition();
+ store.put(key, id);
+ }
+ context.forward(record.withKey(id));
+ }
+
+ @Override
+ public void close() {
+ }
+ },
+ storeName, counterName
+ )
+ .to(outputTopic);
+ return new KafkaStreams(builder.build(), props(stateDirPath));
+ }
+
+
+ private Properties props(final String stateDirPath) {
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
+ streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
+
+ streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
+ streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
+ return streamsConfiguration;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index acb7f0da073..ac30337335b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.test.MockKeyValueStore;
@@ -371,6 +372,32 @@ public class ProcessorStateManagerTest {
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
}
+ @Test
+ public void shouldClearStoreCache() {
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
+ reset(storeMetadata);
+ final CachingStore store = EasyMock.createMock(CachingStore.class);
+
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
+ expect(storeMetadata.store()).andStubReturn(store);
+ expect(store.name()).andStubReturn(persistentStoreName);
+ store.clearCache();
+ expectLastCall().once();
+
+ context.uninitialize();
+ store.init((StateStoreContext) context, store);
+ replay(storeMetadata, context, store);
+
+ stateMgr.registerStateStores(singletonList(store), context);
+
+ stateMgr.registerStore(store, noopStateRestoreCallback, null);
+
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
+
+ stateMgr.recycle();
+
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
+ assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
+ verify(context, store);
+ }
+
@Test
public void shouldRegisterPersistentStores() {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
@@ -1200,4 +1227,6 @@ public class ProcessorStateManagerTest {
super(name, persistent);
}
}
+
+ interface CachingStore extends CachedStateStore, StateStore { }
}