This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new 2dd3713b2a1 KAFKA-14172: Should clear cache when active recycled from standby (#13369) 2dd3713b2a1 is described below commit 2dd3713b2a183bb904651c0022c5d953970b4ad3 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Apr 5 16:05:11 2023 -0700 KAFKA-14172: Should clear cache when active recycled from standby (#13369) This fix is inspired by #12540. 1. Added a clearCache function for CachedStateStore, which would be triggered upon recycling a state manager. 2. Added the integration test inherited from #12540 . 3. Improved some log4j entries. 4. Found and fixed a minor issue with log4j prefix. Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../processor/internals/ActiveTaskCreator.java | 4 +- .../processor/internals/ProcessorStateManager.java | 20 +- .../processor/internals/StandbyTaskCreator.java | 2 +- .../processor/internals/StoreChangelogReader.java | 2 +- .../streams/state/internals/CachedStateStore.java | 10 + .../state/internals/CachingKeyValueStore.java | 12 + .../state/internals/CachingSessionStore.java | 5 + .../state/internals/CachingWindowStore.java | 5 + .../kafka/streams/state/internals/NamedCache.java | 7 + .../kafka/streams/state/internals/ThreadCache.java | 8 + .../internals/TimeOrderedCachingWindowStore.java | 5 + .../streams/state/internals/WrappedStateStore.java | 8 + ...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 +++++++++++++++++++++ .../internals/ProcessorStateManagerTest.java | 29 ++ 14 files changed, 411 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 581175e2c97..7f423c84069 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -226,7 +226,7 @@ class ActiveTaskCreator { } standbyTask.prepareRecycle(); - standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE); + standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE, getLogContext(standbyTask.id)); final RecordCollector recordCollector = createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), standbyTask.topology); final StreamTask task = new StreamTask( @@ -324,7 +324,7 @@ class ActiveTaskCreator { private LogContext getLogContext(final TaskId taskId) { final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); - final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", taskId); + final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "stream-task", taskId); return new LogContext(logPrefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 6a66e428ea5..746bb02db29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -588,14 +588,30 @@ public class ProcessorStateManager implements StateManager { final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions(); changelogReader.unregister(allChangelogs); } + + // when the state manager is recycled to be used, future writes may bypass its store's caching + // layer if they are from restoration, hence we need to clear the state store's caches just in case + // See KAFKA-14172 for details + if (!stores.isEmpty()) { + log.debug("Clearing all store caches registered in the state manager: {}", stores); + for (final StateStoreMetadata metadata : stores.values()) { + final StateStore store = metadata.stateStore; + + if (store instanceof CachedStateStore) { + ((CachedStateStore) store).clearCache(); + } + log.trace("Cleared cache {}", store.name()); + } + } } - void transitionTaskType(final TaskType newType) { + void transitionTaskType(final TaskType newType, final LogContext logContext) { if (taskType.equals(newType)) { throw new IllegalStateException("Tried to recycle state for task type conversion but new type was the same."); } taskType = newType; + log = logContext.logger(ProcessorStateManager.class); } @Override @@ -639,7 +655,7 @@ public class ProcessorStateManager implements StateManager { } } - log.debug("Writing checkpoint: {}", checkpointingOffsets); + log.debug("Writing checkpoint: {} for task {}", checkpointingOffsets, taskId); try { checkpointFile.write(checkpointingOffsets); } catch (final IOException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 46ad4c8909a..0f11d41e18d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -123,7 +123,7 @@ class StandbyTaskCreator { } streamTask.prepareRecycle(); - streamTask.stateMgr.transitionTaskType(Task.TaskType.STANDBY); + streamTask.stateMgr.transitionTaskType(Task.TaskType.STANDBY, getLogContext(streamTask.id)); final StandbyTask task = new StandbyTask( streamTask.id, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 874f1993c19..30c3d3121c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -834,7 +834,7 @@ public class StoreChangelogReader implements ChangelogReader { changelogMetadata.restoreEndOffset = Math.min(endOffset, committedOffset); - log.debug("End offset for changelog {} initialized as {}.", partition, changelogMetadata.restoreEndOffset); + log.info("End offset for changelog {} initialized as {}.", partition, changelogMetadata.restoreEndOffset); } else { if (!newPartitionsToRestore.remove(changelogMetadata)) { throw new IllegalStateException("New changelogs to restore " + newPartitionsToRestore + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java index 37758e17ba6..9bfaa5335cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java @@ -32,4 +32,14 @@ public interface CachedStateStore<K, V> { * TODO: this is a hacky workaround for now, should be removed when we decouple caching with emitting */ void flushCache(); + + /** + * Clear the cache; this is used if the underlying store could be updated directly + * and hence making the cache out of date. + * Please note this call does not try to flush the cache, instead if assumes the cache + * itself has been flushed completely + * + * TODO: this is a hacky workaround for now, should be removed when we decouple caching with emitting + */ + void clearCache(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 04f2a0c6230..8ba3052a9f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -482,6 +482,18 @@ public class CachingKeyValueStore } } + @Override + public void clearCache() { + validateStoreOpen(); + lock.writeLock().lock(); + try { + validateStoreOpen(); + context.cache().clear(cacheName); + } finally { + lock.writeLock().unlock(); + } + } + @Override public void close() { lock.writeLock().lock(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index cff10da5f87..b8d2bf738b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -334,6 +334,11 @@ class CachingSessionStore context.cache().flush(cacheName); } + @Override + public void clearCache() { + context.cache().clear(cacheName); + } + public void close() { final LinkedList<RuntimeException> suppressed = executeAll( () -> context.cache().flush(cacheName), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 5477e57c713..40001fc39da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -425,6 +425,11 @@ class CachingWindowStore context.cache().flush(cacheName); } + @Override + public void clearCache() { + context.cache().clear(cacheName); + } + @Override public synchronized void close() { final LinkedList<RuntimeException> suppressed = executeAll( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 8b8365ea351..c4119906dd7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -361,6 +361,13 @@ class NamedCache { streamsMetrics.removeAllCacheLevelSensors(Thread.currentThread().getName(), taskName, storeName); } + synchronized void clear() { + head = tail = null; + currentSizeBytes = 0; + dirtyKeys.clear(); + cache.clear(); + } + /** * A simple wrapper class to implement a doubly-linked list around MemoryLRUCacheBytesEntry */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 60a934e6b46..66cf008af92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -280,6 +280,14 @@ public class ThreadCache { } } + synchronized void clear(final String namespace) { + final NamedCache cleared = caches.get(namespace); + if (cleared != null) { + sizeInBytes.getAndAdd(-cleared.sizeInBytes()); + cleared.clear(); + } + } + private void maybeEvict(final String namespace, final NamedCache cache) { int numEvicted = 0; while (sizeInBytes.get() > maxCacheSizeBytes) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java index 62cca5e459c..9c5e13684bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java @@ -520,6 +520,11 @@ class TimeOrderedCachingWindowStore context.cache().flush(cacheName); } + @Override + public void clearCache() { + context.cache().clear(cacheName); + } + @Override public synchronized void close() { final LinkedList<RuntimeException> suppressed = executeAll( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index b3e0c767eb5..ff80733c407 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -77,6 +77,14 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S } } + @Override + public void clearCache() { + if (wrapped instanceof CachedStateStore) { + ((CachedStateStore) wrapped).clearCache(); + } + } + + @Override public String name() { return wrapped.name(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java new file mode 100644 index 00000000000..c29475579bd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@Category(IntegrationTest.class) +public class StandbyTaskEOSMultiRebalanceIntegrationTest { + + private static final Logger LOG = LoggerFactory.getLogger(StandbyTaskEOSMultiRebalanceIntegrationTest.class); + + private final static long TWO_MINUTE_TIMEOUT = Duration.ofMinutes(2L).toMillis(); + + private String appId; + private String inputTopic; + private String storeName; + private String counterName; + + private String outputTopic; + + private KafkaStreams streamInstanceOne; + private KafkaStreams streamInstanceTwo; + private KafkaStreams streamInstanceThree; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void createTopics() throws Exception { + final String safeTestName = UUID.randomUUID().toString(); + appId = "app-" + safeTestName; + inputTopic = "input-" + safeTestName; + outputTopic = "output-" + safeTestName; + storeName = "store-" + safeTestName; + counterName = "counter-" + safeTestName; + + CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic); + CLUSTER.createTopic(inputTopic, partitionCount, 3); + CLUSTER.createTopic(outputTopic, partitionCount, 3); + } + + private final int partitionCount = 12; + + @After + public void cleanUp() { + if (streamInstanceOne != null) { + streamInstanceOne.close(); + } + if (streamInstanceTwo != null) { + streamInstanceTwo.close(); + } + if (streamInstanceThree != null) { + streamInstanceThree.close(); + } + } + + // The test produces a duplicate fee range of integers from 0 (inclusive) to initialBulk + secondBulk (exclusive) as input to the stream. + // The stream is responsible for assigning a unique id to each of the integers. + // The output topic must thus contain 63000 message: + // The Key is unique and from the range of input values + // The Values produced are unique. + @Test + public void shouldHonorEOSWhenUsingCachingAndStandbyReplicas() throws Exception { + final Properties readCommitted = new Properties(); + readCommitted.setProperty("isolation.level", "read_committed"); + final long time = System.currentTimeMillis(); + final String base = TestUtils.tempDirectory(appId).getPath(); + + final int initialBulk = 3000; + final int secondBulk = 60000; + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + IntStream.range(0, initialBulk).boxed().map(i -> new KeyValue<>(i, i)).collect(Collectors.toList()), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties() + ), + 10L + time + ); + + streamInstanceOne = buildWithUniqueIdAssignmentTopology(base + "-1"); + streamInstanceTwo = buildWithUniqueIdAssignmentTopology(base + "-2"); + streamInstanceThree = buildWithUniqueIdAssignmentTopology(base + "-3"); + + LOG.info("start first instance and wait for completed processing"); + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30)); + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + UUID.randomUUID().toString(), + IntegerDeserializer.class, + IntegerDeserializer.class, + readCommitted + ), + outputTopic, + initialBulk + ); + LOG.info("Finished reading the initial bulk"); + + LOG.info("start second instance and wait for standby replication"); + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30)); + waitForCondition( + () -> streamInstanceTwo.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.<Integer, Integer>keyValueStore() + ).enableStaleStores() + ).get(0) != null, + TWO_MINUTE_TIMEOUT, + "Could not get key from standby store" + ); + LOG.info("Second stream have some data in the state store"); + + + LOG.info("Produce the second bulk"); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + IntStream.range(initialBulk, initialBulk + secondBulk).boxed().map(i -> new KeyValue<>(i, i)).collect(Collectors.toList()), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties() + ), + 1000L + time + ); + + LOG.info("Start stream three which will introduce a re-balancing event and hopefully some redistribution of tasks."); + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceThree), Duration.ofSeconds(90)); + + LOG.info("Wait for the processing to be completed"); + final List<ConsumerRecord<Integer, Integer>> outputRecords = IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + UUID.randomUUID().toString(), + IntegerDeserializer.class, + IntegerDeserializer.class, + readCommitted + ), + outputTopic, + initialBulk + secondBulk, + Duration.ofMinutes(10L).toMillis() + ); + LOG.info("Processing completed"); + + outputRecords.stream().collect(Collectors.groupingBy(ConsumerRecord::value)).forEach(this::logIfDuplicate); + + assertThat("Each output should correspond to one distinct value", outputRecords.stream().map(ConsumerRecord::value).distinct().count(), is(Matchers.equalTo((long) outputRecords.size()))); + } + + private void logIfDuplicate(final Integer id, final List<ConsumerRecord<Integer, Integer>> record) { + assertThat("The id and the value in the records must match", record.stream().allMatch(r -> id.equals(r.value()))); + if (record.size() > 1) { + LOG.warn("Id : " + id + " is assigned to the following " + record.stream().map(ConsumerRecord::key).collect(Collectors.toList())); + } + } + + private KafkaStreams buildWithUniqueIdAssignmentTopology(final String stateDirPath) { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(storeName), + Serdes.Integer(), + Serdes.Integer()) + ); + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(counterName), + Serdes.Integer(), + Serdes.Integer()).withCachingEnabled() + ); + builder.<Integer, Integer>stream(inputTopic) + .process( + () -> new Processor<Integer, Integer, Integer, Integer>() { + private KeyValueStore<Integer, Integer> store; + private KeyValueStore<Integer, Integer> counter; + private ProcessorContext<Integer, Integer> context; + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(storeName); + counter = context.getStateStore(counterName); + } + + @Override + public void process(final Record<Integer, Integer> record) { + final Integer key = record.key(); + final Integer unused = record.value(); + assertThat("Key and value mus be equal", key.equals(unused)); + Integer id = store.get(key); + // Only assign a new id if the value have not been observed before + if (id == null) { + final int counterKey = 0; + final Integer lastCounter = counter.get(counterKey); + final int newCounter = lastCounter == null ? 0 : lastCounter + 1; + counter.put(counterKey, newCounter); + // Partitions assign ids from their own id space + id = newCounter * partitionCount + context.recordMetadata().get().partition(); + store.put(key, id); + } + context.forward(record.withKey(id)); + } + + @Override + public void close() { + } + }, + storeName, counterName + ) + .to(outputTopic); + return new KafkaStreams(builder.build(), props(stateDirPath)); + } + + + private Properties props(final String stateDirPath) { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath); + streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); + + streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return streamsConfiguration; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index acb7f0da073..ac30337335b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.TimestampedBytesStore; +import org.apache.kafka.streams.state.internals.CachedStateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.StoreQueryUtils; import org.apache.kafka.test.MockKeyValueStore; @@ -371,6 +372,32 @@ public class ProcessorStateManagerTest { assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); } + @Test + public void shouldClearStoreCache() { + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); + reset(storeMetadata); + final CachingStore store = EasyMock.createMock(CachingStore.class); + expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); + expect(storeMetadata.store()).andStubReturn(store); + expect(store.name()).andStubReturn(persistentStoreName); + store.clearCache(); + expectLastCall().once(); + + context.uninitialize(); + store.init((StateStoreContext) context, store); + replay(storeMetadata, context, store); + + stateMgr.registerStateStores(singletonList(store), context); + + stateMgr.registerStore(store, noopStateRestoreCallback, null); + assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); + + stateMgr.recycle(); + assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); + assertThat(stateMgr.getStore(persistentStoreName), equalTo(store)); + verify(context, store); + } + @Test public void shouldRegisterPersistentStores() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); @@ -1200,4 +1227,6 @@ public class ProcessorStateManagerTest { super(name, persistent); } } + + interface CachingStore extends CachedStateStore, StateStore { } }