This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 2dd3713b2a1 KAFKA-14172: Should clear cache when active recycled from 
standby (#13369)
2dd3713b2a1 is described below

commit 2dd3713b2a183bb904651c0022c5d953970b4ad3
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Apr 5 16:05:11 2023 -0700

    KAFKA-14172: Should clear cache when active recycled from standby (#13369)
    
    This fix is inspired by #12540.
    
    1. Added a clearCache function for CachedStateStore, which would be 
triggered upon recycling a state manager.
    2. Added the integration test inherited from #12540 .
    3. Improved some log4j entries.
    4. Found and fixed a minor issue with log4j prefix.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../processor/internals/ActiveTaskCreator.java     |   4 +-
 .../processor/internals/ProcessorStateManager.java |  20 +-
 .../processor/internals/StandbyTaskCreator.java    |   2 +-
 .../processor/internals/StoreChangelogReader.java  |   2 +-
 .../streams/state/internals/CachedStateStore.java  |  10 +
 .../state/internals/CachingKeyValueStore.java      |  12 +
 .../state/internals/CachingSessionStore.java       |   5 +
 .../state/internals/CachingWindowStore.java        |   5 +
 .../kafka/streams/state/internals/NamedCache.java  |   7 +
 .../kafka/streams/state/internals/ThreadCache.java |   8 +
 .../internals/TimeOrderedCachingWindowStore.java   |   5 +
 .../streams/state/internals/WrappedStateStore.java |   8 +
 ...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 +++++++++++++++++++++
 .../internals/ProcessorStateManagerTest.java       |  29 ++
 14 files changed, 411 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 581175e2c97..7f423c84069 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -226,7 +226,7 @@ class ActiveTaskCreator {
         }
 
         standbyTask.prepareRecycle();
-        standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE);
+        standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE, 
getLogContext(standbyTask.id));
 
         final RecordCollector recordCollector = 
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), 
standbyTask.topology);
         final StreamTask task = new StreamTask(
@@ -324,7 +324,7 @@ class ActiveTaskCreator {
 
     private LogContext getLogContext(final TaskId taskId) {
         final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
-        final String logPrefix = threadIdPrefix + String.format("%s [%s] ", 
"task", taskId);
+        final String logPrefix = threadIdPrefix + String.format("%s [%s] ", 
"stream-task", taskId);
         return new LogContext(logPrefix);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 6a66e428ea5..746bb02db29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -588,14 +588,30 @@ public class ProcessorStateManager implements 
StateManager {
             final List<TopicPartition> allChangelogs = 
getAllChangelogTopicPartitions();
             changelogReader.unregister(allChangelogs);
         }
+
+        // when the state manager is recycled to be used, future writes may 
bypass its store's caching
+        // layer if they are from restoration, hence we need to clear the 
state store's caches just in case
+        // See KAFKA-14172 for details
+        if (!stores.isEmpty()) {
+            log.debug("Clearing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                if (store instanceof CachedStateStore) {
+                    ((CachedStateStore) store).clearCache();
+                }
+                log.trace("Cleared cache {}", store.name());
+            }
+        }
     }
 
-    void transitionTaskType(final TaskType newType) {
+    void transitionTaskType(final TaskType newType, final LogContext 
logContext) {
         if (taskType.equals(newType)) {
             throw new IllegalStateException("Tried to recycle state for task 
type conversion but new type was the same.");
         }
 
         taskType = newType;
+        log = logContext.logger(ProcessorStateManager.class);
     }
 
     @Override
@@ -639,7 +655,7 @@ public class ProcessorStateManager implements StateManager {
             }
         }
 
-        log.debug("Writing checkpoint: {}", checkpointingOffsets);
+        log.debug("Writing checkpoint: {} for task {}", checkpointingOffsets, 
taskId);
         try {
             checkpointFile.write(checkpointingOffsets);
         } catch (final IOException e) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 46ad4c8909a..0f11d41e18d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -123,7 +123,7 @@ class StandbyTaskCreator {
         }
 
         streamTask.prepareRecycle();
-        streamTask.stateMgr.transitionTaskType(Task.TaskType.STANDBY);
+        streamTask.stateMgr.transitionTaskType(Task.TaskType.STANDBY, 
getLogContext(streamTask.id));
 
         final StandbyTask task = new StandbyTask(
             streamTask.id,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 874f1993c19..30c3d3121c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -834,7 +834,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
 
                 changelogMetadata.restoreEndOffset = Math.min(endOffset, 
committedOffset);
 
-                log.debug("End offset for changelog {} initialized as {}.", 
partition, changelogMetadata.restoreEndOffset);
+                log.info("End offset for changelog {} initialized as {}.", 
partition, changelogMetadata.restoreEndOffset);
             } else {
                 if (!newPartitionsToRestore.remove(changelogMetadata)) {
                     throw new IllegalStateException("New changelogs to restore 
" + newPartitionsToRestore +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
index 37758e17ba6..9bfaa5335cb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java
@@ -32,4 +32,14 @@ public interface CachedStateStore<K, V> {
      * TODO: this is a hacky workaround for now, should be removed when we 
decouple caching with emitting
      */
     void flushCache();
+
+    /**
+     * Clear the cache; this is used if the underlying store could be updated 
directly
+     * and hence making the cache out of date.
+     * Please note this call does not try to flush the cache, instead if 
assumes the cache
+     * itself has been flushed completely
+     *
+     * TODO: this is a hacky workaround for now, should be removed when we 
decouple caching with emitting
+     */
+    void clearCache();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 04f2a0c6230..8ba3052a9f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -482,6 +482,18 @@ public class CachingKeyValueStore
         }
     }
 
+    @Override
+    public void clearCache() {
+        validateStoreOpen();
+        lock.writeLock().lock();
+        try {
+            validateStoreOpen();
+            context.cache().clear(cacheName);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     @Override
     public void close() {
         lock.writeLock().lock();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index cff10da5f87..b8d2bf738b9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -334,6 +334,11 @@ class CachingSessionStore
         context.cache().flush(cacheName);
     }
 
+    @Override
+    public void clearCache() {
+        context.cache().clear(cacheName);
+    }
+
     public void close() {
         final LinkedList<RuntimeException> suppressed = executeAll(
             () -> context.cache().flush(cacheName),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 5477e57c713..40001fc39da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -425,6 +425,11 @@ class CachingWindowStore
         context.cache().flush(cacheName);
     }
 
+    @Override
+    public void clearCache() {
+        context.cache().clear(cacheName);
+    }
+
     @Override
     public synchronized void close() {
         final LinkedList<RuntimeException> suppressed = executeAll(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 8b8365ea351..c4119906dd7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -361,6 +361,13 @@ class NamedCache {
         
streamsMetrics.removeAllCacheLevelSensors(Thread.currentThread().getName(), 
taskName, storeName);
     }
 
+    synchronized void clear() {
+        head = tail = null;
+        currentSizeBytes = 0;
+        dirtyKeys.clear();
+        cache.clear();
+    }
+
     /**
      * A simple wrapper class to implement a doubly-linked list around 
MemoryLRUCacheBytesEntry
      */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 60a934e6b46..66cf008af92 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -280,6 +280,14 @@ public class ThreadCache {
         }
     }
 
+    synchronized void clear(final String namespace) {
+        final NamedCache cleared = caches.get(namespace);
+        if (cleared != null) {
+            sizeInBytes.getAndAdd(-cleared.sizeInBytes());
+            cleared.clear();
+        }
+    }
+
     private void maybeEvict(final String namespace, final NamedCache cache) {
         int numEvicted = 0;
         while (sizeInBytes.get() > maxCacheSizeBytes) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 62cca5e459c..9c5e13684bc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -520,6 +520,11 @@ class TimeOrderedCachingWindowStore
         context.cache().flush(cacheName);
     }
 
+    @Override
+    public void clearCache() {
+        context.cache().clear(cacheName);
+    }
+
     @Override
     public synchronized void close() {
         final LinkedList<RuntimeException> suppressed = executeAll(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index b3e0c767eb5..ff80733c407 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -77,6 +77,14 @@ public abstract class WrappedStateStore<S extends 
StateStore, K, V> implements S
         }
     }
 
+    @Override
+    public void clearCache() {
+        if (wrapped instanceof CachedStateStore) {
+            ((CachedStateStore) wrapped).clearCache();
+        }
+    }
+
+
     @Override
     public String name() {
         return wrapped.name();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
new file mode 100644
index 00000000000..c29475579bd
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@Category(IntegrationTest.class)
+public class StandbyTaskEOSMultiRebalanceIntegrationTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StandbyTaskEOSMultiRebalanceIntegrationTest.class);
+
+    private final static long TWO_MINUTE_TIMEOUT = 
Duration.ofMinutes(2L).toMillis();
+
+    private String appId;
+    private String inputTopic;
+    private String storeName;
+    private String counterName;
+
+    private String outputTopic;
+
+    private KafkaStreams streamInstanceOne;
+    private KafkaStreams streamInstanceTwo;
+    private KafkaStreams streamInstanceThree;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void createTopics() throws Exception {
+        final String safeTestName = UUID.randomUUID().toString();
+        appId = "app-" + safeTestName;
+        inputTopic = "input-" + safeTestName;
+        outputTopic = "output-" + safeTestName;
+        storeName = "store-" + safeTestName;
+        counterName = "counter-" + safeTestName;
+
+        CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic);
+        CLUSTER.createTopic(inputTopic, partitionCount, 3);
+        CLUSTER.createTopic(outputTopic, partitionCount, 3);
+    }
+
+    private final int partitionCount = 12;
+
+    @After
+    public void cleanUp() {
+        if (streamInstanceOne != null) {
+            streamInstanceOne.close();
+        }
+        if (streamInstanceTwo != null) {
+            streamInstanceTwo.close();
+        }
+        if (streamInstanceThree != null) {
+            streamInstanceThree.close();
+        }
+    }
+
+    // The test produces a duplicate fee range of integers from 0 (inclusive) 
to initialBulk + secondBulk (exclusive) as input to the stream.
+    // The stream is responsible for assigning a unique id to each of the 
integers.
+    // The output topic must thus contain 63000 message:
+    //      The Key is unique and from the range of input values
+    //      The Values produced are unique.
+    @Test
+    public void shouldHonorEOSWhenUsingCachingAndStandbyReplicas() throws 
Exception {
+        final Properties readCommitted = new Properties();
+        readCommitted.setProperty("isolation.level", "read_committed");
+        final long time = System.currentTimeMillis();
+        final String base = TestUtils.tempDirectory(appId).getPath();
+
+        final int initialBulk = 3000;
+        final int secondBulk = 60000;
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                IntStream.range(0, initialBulk).boxed().map(i -> new 
KeyValue<>(i, i)).collect(Collectors.toList()),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        IntegerSerializer.class,
+                        IntegerSerializer.class,
+                        new Properties()
+                ),
+                10L + time
+        );
+
+        streamInstanceOne = buildWithUniqueIdAssignmentTopology(base + "-1");
+        streamInstanceTwo = buildWithUniqueIdAssignmentTopology(base + "-2");
+        streamInstanceThree = buildWithUniqueIdAssignmentTopology(base + "-3");
+
+        LOG.info("start first instance and wait for completed processing");
+        
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne),
 Duration.ofSeconds(30));
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                        CLUSTER.bootstrapServers(),
+                        UUID.randomUUID().toString(),
+                        IntegerDeserializer.class,
+                        IntegerDeserializer.class,
+                        readCommitted
+                ),
+                outputTopic,
+                initialBulk
+        );
+        LOG.info("Finished reading the initial bulk");
+
+        LOG.info("start second instance and wait for standby replication");
+        
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo),
 Duration.ofSeconds(30));
+        waitForCondition(
+                () -> streamInstanceTwo.store(
+                        StoreQueryParameters.fromNameAndType(
+                                storeName,
+                                QueryableStoreTypes.<Integer, 
Integer>keyValueStore()
+                        ).enableStaleStores()
+                ).get(0) != null,
+                TWO_MINUTE_TIMEOUT,
+                "Could not get key from standby store"
+        );
+        LOG.info("Second stream have some data in the state store");
+
+
+        LOG.info("Produce the second bulk");
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                IntStream.range(initialBulk, initialBulk + 
secondBulk).boxed().map(i -> new KeyValue<>(i, i)).collect(Collectors.toList()),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        IntegerSerializer.class,
+                        IntegerSerializer.class,
+                        new Properties()
+                ),
+                1000L + time
+        );
+
+        LOG.info("Start stream three which will introduce a re-balancing event 
and hopefully some redistribution of tasks.");
+        
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceThree),
 Duration.ofSeconds(90));
+
+        LOG.info("Wait for the processing to be completed");
+        final List<ConsumerRecord<Integer, Integer>> outputRecords = 
IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                        CLUSTER.bootstrapServers(),
+                        UUID.randomUUID().toString(),
+                        IntegerDeserializer.class,
+                        IntegerDeserializer.class,
+                        readCommitted
+                ),
+                outputTopic,
+                initialBulk + secondBulk,
+                Duration.ofMinutes(10L).toMillis()
+        );
+        LOG.info("Processing completed");
+
+        
outputRecords.stream().collect(Collectors.groupingBy(ConsumerRecord::value)).forEach(this::logIfDuplicate);
+
+        assertThat("Each output should correspond to one distinct value", 
outputRecords.stream().map(ConsumerRecord::value).distinct().count(), 
is(Matchers.equalTo((long) outputRecords.size())));
+    }
+
+    private void logIfDuplicate(final Integer id, final 
List<ConsumerRecord<Integer, Integer>> record) {
+        assertThat("The id and the value in the records must match", 
record.stream().allMatch(r -> id.equals(r.value())));
+        if (record.size() > 1) {
+            LOG.warn("Id : " + id + " is assigned to the following " + 
record.stream().map(ConsumerRecord::key).collect(Collectors.toList()));
+        }
+    }
+
+    private KafkaStreams buildWithUniqueIdAssignmentTopology(final String 
stateDirPath) {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.addStateStore(Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore(storeName),
+                Serdes.Integer(),
+                Serdes.Integer())
+        );
+        builder.addStateStore(Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore(counterName),
+                Serdes.Integer(),
+                Serdes.Integer()).withCachingEnabled()
+        );
+        builder.<Integer, Integer>stream(inputTopic)
+                .process(
+                        () -> new Processor<Integer, Integer, Integer, 
Integer>() {
+                            private KeyValueStore<Integer, Integer> store;
+                            private KeyValueStore<Integer, Integer> counter;
+                            private ProcessorContext<Integer, Integer> context;
+
+                            @Override
+                            public void init(final ProcessorContext<Integer, 
Integer> context) {
+                                this.context = context;
+                                store = context.getStateStore(storeName);
+                                counter = context.getStateStore(counterName);
+                            }
+
+                            @Override
+                            public void process(final Record<Integer, Integer> 
record) {
+                                final Integer key = record.key();
+                                final Integer unused = record.value();
+                                assertThat("Key and value mus be equal", 
key.equals(unused));
+                                Integer id = store.get(key);
+                                // Only assign a new id if the value have not 
been observed before
+                                if (id == null) {
+                                    final int counterKey = 0;
+                                    final Integer lastCounter = 
counter.get(counterKey);
+                                    final int newCounter = lastCounter == null 
? 0 : lastCounter + 1;
+                                    counter.put(counterKey, newCounter);
+                                    // Partitions assign ids from their own id 
space
+                                    id = newCounter * partitionCount + 
context.recordMetadata().get().partition();
+                                    store.put(key, id);
+                                }
+                                context.forward(record.withKey(id));
+                            }
+
+                            @Override
+                            public void close() {
+                            }
+                        },
+                        storeName, counterName
+                )
+                .to(outputTopic);
+        return new KafkaStreams(builder.build(), props(stateDirPath));
+    }
+
+
+    private Properties props(final String stateDirPath) {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
+        streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
+
+        streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+        return streamsConfiguration;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index acb7f0da073..ac30337335b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
 import 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils;
 import org.apache.kafka.test.MockKeyValueStore;
@@ -371,6 +372,32 @@ public class ProcessorStateManagerTest {
         
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
     }
 
+    @Test
+    public void shouldClearStoreCache() {
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+        reset(storeMetadata);
+        final CachingStore store = EasyMock.createMock(CachingStore.class);
+        
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
+        expect(storeMetadata.store()).andStubReturn(store);
+        expect(store.name()).andStubReturn(persistentStoreName);
+        store.clearCache();
+        expectLastCall().once();
+
+        context.uninitialize();
+        store.init((StateStoreContext) context, store);
+        replay(storeMetadata, context, store);
+
+        stateMgr.registerStateStores(singletonList(store), context);
+
+        stateMgr.registerStore(store, noopStateRestoreCallback, null);
+        
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
+
+        stateMgr.recycle();
+        
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
+        assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
+        verify(context, store);
+    }
+
     @Test
     public void shouldRegisterPersistentStores() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
@@ -1200,4 +1227,6 @@ public class ProcessorStateManagerTest {
             super(name, persistent);
         }
     }
+
+    interface CachingStore extends CachedStateStore, StateStore { }
 }

Reply via email to