This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d7fed9d0235 KAFKA-19709: Add LegacyCheckpointingStateStore (#21554)
d7fed9d0235 is described below

commit d7fed9d02354323fa031f99b758276cef2093afa
Author: Nick Telford <[email protected]>
AuthorDate: Wed Mar 11 20:14:28 2026 +0000

    KAFKA-19709: Add LegacyCheckpointingStateStore (#21554)
    
    Extracts the handling of `.checkpoint` files, which will become a legacy
    feature as part of KIP-1035, to a separate
    `LegacyCheckpointingStateStore`, which will wrap any `StateStore` that
    needs it.
    
    This implementation is currently unused, as the changes that introduce
    its usage will be made as part of a follow-up PR.
    
    A core goal of this implementation is to fully encapsulate
    "checkpointing", and the management of `.checkpoint` files within this
    class. Consequently, there are a few behavioural differences:
    
    1. Checkpointing is now done per-store, rather than per-task.
    2. Since multiple stores can share the same `.checkpoint` file, when we
    checkpoint, we have to read the current `.checkpoint` file and merge our
    new offsets into it, to prevent removing the offsets from other stores.
    3. Decisions about *when* to checkpoint are now taken during `commit`.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../processor/internals/StateDirectory.java        |   2 +-
 .../internals/LegacyCheckpointingStateStore.java   | 309 ++++++++++
 .../LegacyCheckpointingStateStoreTest.java         | 668 +++++++++++++++++++++
 3 files changed, 978 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 5b37c1bf427..f66f177c0ef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -474,7 +474,7 @@ public class StateDirectory implements AutoCloseable {
      * @return directory for the global stores
      * @throws ProcessorStateException if the global store directory does not 
exists and could not be created
      */
-    File globalStateDir() {
+    public File globalStateDir() {
         final File dir = new File(stateDir, "global");
         if (hasPersistentStores) {
             if (!dir.exists() && !dir.mkdir()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
new file mode 100644
index 00000000000..a5c3a42b3c8
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
+
+public class LegacyCheckpointingStateStore<S extends StateStore, K, V> extends 
WrappedStateStore<S, K, V> {
+    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+
+    private static final Logger log = 
LoggerFactory.getLogger(LegacyCheckpointingStateStore.class);
+
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
+
+    private final boolean eosEnabled;
+    private final Set<TopicPartition> changelogPartitions;
+    private final StateDirectory stateDirectory;
+    private final TaskId taskId;
+    private final OffsetCheckpoint checkpointFile;
+    private final String logPrefix;
+
+    private final Map<TopicPartition, Long> offsets = new HashMap<>();
+    private Map<TopicPartition, Long> checkpointedOffsets;
+    private boolean corrupted = false;
+
+    /**
+     * Wraps the given {@link StateStore} as a {@code 
LegacyCheckpointingStateStore}, only if it is both
+     * {@link StateStore#persistent() persistent}, and it does not {@link 
StateStore#managesOffsets() manage its own offsets}.
+     */
+    @SuppressWarnings("deprecation")
+    public static <S extends StateStore, K, V> StateStore maybeWrapStore(final 
S wrapped,
+                                                                         final 
boolean eosEnabled,
+                                                                         final 
Set<TopicPartition> changelogPartitions,
+                                                                         final 
StateDirectory stateDirectory,
+                                                                         final 
TaskId taskId,
+                                                                         final 
String logPrefix) {
+        return wrapped.persistent() && !wrapped.managesOffsets()
+                ? new LegacyCheckpointingStateStore<>(wrapped, eosEnabled, 
changelogPartitions, stateDirectory, taskId, logPrefix)
+                : wrapped;
+    }
+
+    /**
+     * Unwraps the given store, only if it is a {@code 
LegacyCheckpointingStateStore}.
+     */
+    public static StateStore maybeUnwrapStore(final StateStore store) {
+        return (store instanceof LegacyCheckpointingStateStore<?, ?, ?>)
+                ? ((LegacyCheckpointingStateStore<?, ?, ?>) store).wrapped()
+                : store;
+    }
+
+    public static void maybeMarkCorrupted(final StateStore store) {
+        if (store instanceof LegacyCheckpointingStateStore<?, ?, ?>) {
+            ((LegacyCheckpointingStateStore<?, ?, ?>) store).markAsCorrupted();
+        }
+    }
+
+    /**
+     * Migrates offsets stored in a legacy, global/per-task .checkpoint file 
into the {@code stores}.
+     *
+     * The {@code stores} <em>MUST</em> manage their own offsets (i.e. {@link 
#managesOffsets()} must be {@code true}.
+     * They can either do this themselves, or be wrapped in a {@link 
LegacyCheckpointingStateStore} implementation.
+     *
+     * Once this method successfully returns, the legacy {@code .checkpoint} 
file for the given {@link TaskId} (or the
+     * global checkpoint, when {@code taskId} is {@code null}), will have been 
migrated and deleted from the filesystem.
+     *
+     * @param logPrefix Log prefix to use for log messages.
+     * @param stateDirectory The singleton {@link StateDirectory} used for 
looking up existing checkpoint files.
+     * @param taskId Either the task ID for regular stores, or {@code null} to 
migrate global stores.
+     * @param stores A {@link Map} of {@link TopicPartition changelog 
partitions} to their {@link StateStore}. For global
+     *               stores, which may have multiple {@link TopicPartition 
changelog partitions}, stores may appear
+     *               multiple times, once for each of its {@link 
TopicPartition changelog partitions}.
+     */
+    @SuppressWarnings("deprecation")
+    public static void migrateLegacyOffsets(final String logPrefix,
+                                            final StateDirectory 
stateDirectory,
+                                            final TaskId taskId,
+                                            final Map<TopicPartition, 
StateStore> stores) {
+        // load legacy per-task checkpoint
+        final File legacyCheckpointFile = checkpointFileFor(stateDirectory, 
taskId, null);
+
+        if (legacyCheckpointFile.exists()) {
+            log.info("Migrating legacy checkpoint file for task {}", taskId);
+            final OffsetCheckpoint legacyCheckpoint = new 
OffsetCheckpoint(legacyCheckpointFile);
+
+            try {
+                // build offsets for each store
+                final Map<StateStore, Map<TopicPartition, Long>> 
storesToMigrate = new HashMap<>();
+                for (final Map.Entry<TopicPartition, Long> entry : 
legacyCheckpoint.read().entrySet()) {
+                    final StateStore store = stores.get(entry.getKey());
+                    if (store != null) {
+                        storesToMigrate.computeIfAbsent(store, k -> new 
HashMap<>()).put(entry.getKey(), entry.getValue());
+                    }
+                }
+
+                // commit checkpointed offsets to each store
+                for (final Map.Entry<StateStore, Map<TopicPartition, Long>> 
entry : storesToMigrate.entrySet()) {
+                    final StateStore store = entry.getKey();
+                    if (!store.managesOffsets()) {
+                        log.warn("{}Error migrating legacy checkpoint offsets: 
StateStore '{}' does not manage its own offsets. " +
+                                "The checkpointed offsets for this store will 
not be migrated, and will be lost. " +
+                                "This store will need to fully restore its 
state on application restart. " +
+                                "This is a bug in Kafka Streams, and should 
never be possible.", logPrefix, store.name());
+                    }
+
+                    // attempt to commit the offsets, even if the store 
doesn't manage them itself
+                    store.commit(entry.getValue());
+                }
+
+                // delete legacy checkpoint file
+                legacyCheckpoint.delete();
+
+                log.info("Migrated legacy checkpoint file for task {} with 
offsets migrated for {} stores", taskId, storesToMigrate.size());
+            } catch (final IOException | RuntimeException e) {
+                throw new ProcessorStateException(String.format("%sError 
migrating checkpoint file for task '%s'", logPrefix, taskId), e);
+            }
+        } else {
+            log.debug("No legacy checkpoint file found for task {}", taskId);
+        }
+    }
+
+    LegacyCheckpointingStateStore(final S wrapped,
+                                  final boolean eosEnabled,
+                                  final Set<TopicPartition> 
changelogPartitions,
+                                  final StateDirectory stateDirectory,
+                                  final TaskId taskId,
+                                  final String logPrefix) {
+        super(wrapped);
+        this.eosEnabled = eosEnabled;
+        this.changelogPartitions = changelogPartitions;
+        this.stateDirectory = stateDirectory;
+        this.taskId = taskId;
+        this.checkpointFile = new 
OffsetCheckpoint(checkpointFileFor(stateDirectory, taskId, this));
+        this.logPrefix = logPrefix;
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        // load store offsets from checkpoint file
+        try {
+            final Map<TopicPartition, Long> allOffsets = checkpointFile.read();
+            for (final Map.Entry<TopicPartition, Long> entry : 
allOffsets.entrySet()) {
+                if (changelogPartitions.contains(entry.getKey())) {
+                    offsets.put(entry.getKey(), 
changelogOffsetFromCheckpointedOffset(entry.getValue()));
+                }
+            }
+            checkpointedOffsets = new HashMap<>(offsets);
+        } catch (final IOException | RuntimeException e) {
+            throw new ProcessorStateException(String.format("%sError loading 
checkpoint file when creating StateStore '%s'", logPrefix, name()), e);
+        }
+
+        // initialize the actual store
+        super.init(stateStoreContext, root);
+
+        // under EOS, we delete the checkpoint file after everything has been 
loaded to ensure state is wiped after a crash
+        try {
+            if (eosEnabled) {
+                checkpointFile.delete();
+            }
+        } catch (final IOException e) {
+            throw new ProcessorStateException(String.format("%sError deleting 
checkpoint file when creating StateStore '%s'", logPrefix, name()), e);
+        }
+    }
+
+    @Override
+    @Deprecated
+    public boolean managesOffsets() {
+        return true;
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return offsets.get(partition);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        super.commit(changelogOffsets);
+
+        // update in-memory offsets
+        offsets.putAll(changelogOffsets);
+
+        // only write the checkpoint file if both:
+        // 1. in ALOS mode (under EOS, the checkpoint file is only written 
when closing the store)
+        // 2. we have written enough new data to the store to warrant updating 
the checkpoint (prevents disk thrashing)
+        if (!eosEnabled && checkpointNeeded(checkpointedOffsets, offsets)) {
+            checkpoint();
+        }
+    }
+
+    @Override
+    public void close() {
+        super.close();
+
+        if (!corrupted) {
+            checkpoint();
+        }
+    }
+
+    public void markAsCorrupted() {
+        corrupted = true;
+    }
+
+    /**
+     * "checkpoint" committed offsets to disk.
+     */
+    void checkpoint() {
+        // only checkpoint persistent and logged stores
+        if (persistent() && !changelogPartitions.isEmpty()) {
+            try {
+                // merge new checkpoint offsets into checkpoint file
+                final Map<TopicPartition, Long> checkpointingOffsets = new 
HashMap<>(offsets.size());
+                for (final Map.Entry<TopicPartition, Long> entry : 
offsets.entrySet()) {
+                    checkpointingOffsets.put(entry.getKey(), 
checkpointableOffsetFromChangelogOffset(entry.getValue()));
+                }
+
+                log.debug("Writing checkpoint: {} for task {}", 
checkpointingOffsets, taskId);
+                checkpointFile.write(checkpointingOffsets);
+                checkpointedOffsets = new HashMap<>(offsets);
+            } catch (final IOException e) {
+                log.warn("{}Failed to write offset checkpoint file to [{}]." +
+                                " This may occur if OS cleaned the state.dir 
in case when it located in ${java.io.tmpdir} directory." +
+                                " This may also occur due to running multiple 
instances on the same machine using the same state dir." +
+                                " Changing the location of state.dir may 
resolve the problem.",
+                        logPrefix, checkpointFile, e);
+            }
+        }
+    }
+
+    static File checkpointFileFor(final StateDirectory stateDirectory,
+                                  final TaskId taskId,
+                                  final StateStore store) {
+        return taskId == null ?
+                // global store
+                (store == null ?
+                        // legacy, global file
+                        new File(stateDirectory.globalStateDir(), 
CHECKPOINT_FILE_NAME) :
+                        // per-store file
+                        new File(stateDirectory.globalStateDir(), 
CHECKPOINT_FILE_NAME + "_" + store.name())
+                ) :
+                (store == null ?
+                        // legacy, per-task file
+                        new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME) :
+                        // per-store file
+                        new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME + 
"_" + store.name())
+                );
+    }
+
+    static boolean checkpointNeeded(final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null) {
+            return false;
+        }
+
+        // we can checkpoint if the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            final Long newOffset = entry.getValue();
+            if (newOffset != null) {
+                final Long oldOffset = oldOffsetSnapshot.get(entry.getKey());
+                totalOffsetDelta += newOffset - (oldOffset == null ? 0L : 
oldOffset);
+            }
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
+    }
+
+    // Pass in a sentinel value to checkpoint when the changelog offset is not 
yet initialized/known
+    private static long checkpointableOffsetFromChangelogOffset(final Long 
offset) {
+        return offset != null ? offset : OFFSET_UNKNOWN;
+    }
+
+    // Convert the written offsets in the checkpoint file back to the 
changelog offset
+    private static Long changelogOffsetFromCheckpointedOffset(final long 
offset) {
+        return offset != OFFSET_UNKNOWN ? offset : null;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
new file mode 100644
index 00000000000..493e437e2c7
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.test.MockKeyValueStore;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
+import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link LegacyCheckpointingStateStore}.
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class LegacyCheckpointingStateStoreTest {
+
+    private static final String APPLICATION_ID = "test-application";
+    private static final String LOG_PREFIX = "test-prefix ";
+    private static final String STORE_NAME = "test-store";
+    private static final String CHANGELOG_TOPIC = "test-topic";
+
+    private final TopicPartition partition = new 
TopicPartition(CHANGELOG_TOPIC, 0);
+    private final TaskId taskId = new TaskId(0, 0);
+
+    private File baseDir;
+    private StateDirectory stateDirectory;
+    private MockKeyValueStore persistentStore;
+
+    @Mock
+    private StateStoreContext context;
+
+    @BeforeEach
+    public void setUp() {
+        baseDir = TestUtils.tempDirectory();
+        stateDirectory = new StateDirectory(new StreamsConfig(new Properties() 
{
+            {
+                put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
+            }
+        }), new MockTime(), true, true);
+        persistentStore = new MockKeyValueStore(STORE_NAME, true);
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        Utils.delete(baseDir);
+    }
+
+    // =====================================================================
+    // maybeWrapStore()
+    // =====================================================================
+
+    @Test
+    public void shouldWrapPersistentNonOffsetManagingStore() {
+        final StateStore result = LegacyCheckpointingStateStore.maybeWrapStore(
+            persistentStore, false, Set.of(partition), stateDirectory, taskId, 
LOG_PREFIX);
+
+        assertThat(result, instanceOf(LegacyCheckpointingStateStore.class));
+    }
+
+    @Test
+    public void shouldNotWrapNonPersistentStore() {
+        final MockKeyValueStore nonPersistentStore = new 
MockKeyValueStore(STORE_NAME, false);
+
+        final StateStore result = LegacyCheckpointingStateStore.maybeWrapStore(
+            nonPersistentStore, false, Set.of(partition), stateDirectory, 
taskId, LOG_PREFIX);
+
+        assertThat(result, 
not(instanceOf(LegacyCheckpointingStateStore.class)));
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void shouldNotWrapOffsetManagingStore() {
+        final MockKeyValueStore offsetManagingStore = new 
MockKeyValueStore(STORE_NAME, true) {
+            @Override
+            public boolean managesOffsets() {
+                return true;
+            }
+        };
+
+        final StateStore result = LegacyCheckpointingStateStore.maybeWrapStore(
+            offsetManagingStore, false, Set.of(partition), stateDirectory, 
taskId, LOG_PREFIX);
+
+        assertThat(result, 
not(instanceOf(LegacyCheckpointingStateStore.class)));
+    }
+
+    // =====================================================================
+    // maybeUnwrapStore()
+    // =====================================================================
+
+    @Test
+    public void shouldUnwrapLCSSToInnerStore() {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
lcss = createStore(false);
+
+        final StateStore result = 
LegacyCheckpointingStateStore.maybeUnwrapStore(lcss);
+
+        assertEquals(persistentStore, result);
+    }
+
+    @Test
+    public void shouldReturnNonLCSSStoreUnchangedFromMaybeUnwrap() {
+        final StateStore result = 
LegacyCheckpointingStateStore.maybeUnwrapStore(persistentStore);
+
+        assertEquals(persistentStore, result);
+    }
+
+    // =====================================================================
+    // maybeMarkCorrupted()
+    // =====================================================================
+
+    @Test
+    public void shouldMarkLCSSAsCorruptedSoCloseDoesNotWriteCheckpoint() 
throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+
+        LegacyCheckpointingStateStore.maybeMarkCorrupted(store);
+        // commit() won't write checkpoint either: checkpointedOffsets is null 
-> checkpointNeeded returns false
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.close();
+
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldBeNoOpForNonLCSSStoreInMaybeMarkCorrupted() {
+        // Should not throw
+        LegacyCheckpointingStateStore.maybeMarkCorrupted(persistentStore);
+    }
+
+    // =====================================================================
+    // checkpointFileFor() — static helper
+    // =====================================================================
+
+    @Test
+    public void shouldReturnPerStoreCheckpointFileForTask() {
+        final File result = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, 
persistentStore);
+        final File expected = new File(
+            stateDirectory.getOrCreateDirectoryForTask(taskId),
+            CHECKPOINT_FILE_NAME + "_" + STORE_NAME
+        );
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void shouldReturnLegacyPerTaskCheckpointFile() {
+        final File result = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        final File expected = new File(
+            stateDirectory.getOrCreateDirectoryForTask(taskId),
+            CHECKPOINT_FILE_NAME
+        );
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void shouldReturnPerStoreCheckpointFileForGlobalStore() {
+        // globalStateDir() = new File(new File(baseDir, APPLICATION_ID), 
"global")
+        final File expectedGlobalDir = new File(new File(baseDir, 
APPLICATION_ID), "global");
+        final File expected = new File(expectedGlobalDir, CHECKPOINT_FILE_NAME 
+ "_" + STORE_NAME);
+
+        final File result = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, 
persistentStore);
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void shouldReturnLegacyGlobalCheckpointFile() {
+        final File expectedGlobalDir = new File(new File(baseDir, 
APPLICATION_ID), "global");
+        final File expected = new File(expectedGlobalDir, 
CHECKPOINT_FILE_NAME);
+
+        final File result = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, null);
+
+        assertEquals(expected, result);
+    }
+
+    // =====================================================================
+    // checkpointNeeded() — static helper
+    // =====================================================================
+
+    @Test
+    public void shouldReturnFalseFromCheckpointNeededWhenOldSnapshotIsNull() {
+        assertFalse(LegacyCheckpointingStateStore.checkpointNeeded(
+            null, Collections.singletonMap(partition, 100L)));
+    }
+
+    @Test
+    public void 
shouldReturnFalseFromCheckpointNeededWhenDeltaEqualsThreshold() {
+        // delta == threshold → NOT greater than → no checkpoint needed
+        final Map<TopicPartition, Long> oldOffsets = Collections.emptyMap();
+        final Map<TopicPartition, Long> newOffsets = Collections.singletonMap(
+            partition, OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT);
+
+        assertFalse(LegacyCheckpointingStateStore.checkpointNeeded(oldOffsets, 
newOffsets));
+    }
+
+    @Test
+    public void shouldReturnFalseFromCheckpointNeededWhenDeltaBelowThreshold() 
{
+        final Map<TopicPartition, Long> oldOffsets = Collections.emptyMap();
+        final Map<TopicPartition, Long> newOffsets = Collections.singletonMap(
+            partition, OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT / 2);
+
+        assertFalse(LegacyCheckpointingStateStore.checkpointNeeded(oldOffsets, 
newOffsets));
+    }
+
+    @Test
+    public void 
shouldReturnTrueFromCheckpointNeededWhenDeltaExceedsThreshold() {
+        final Map<TopicPartition, Long> oldOffsets = Collections.emptyMap();
+        final Map<TopicPartition, Long> newOffsets = Collections.singletonMap(
+            partition, OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L);
+
+        assertTrue(LegacyCheckpointingStateStore.checkpointNeeded(oldOffsets, 
newOffsets));
+    }
+
+    // =====================================================================
+    // init()
+    // =====================================================================
+
+    @Test
+    public void shouldSucceedOnInitWhenNoCheckpointFileExists() {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+
+        assertNull(store.committedOffset(partition));
+        assertTrue(persistentStore.initialized);
+    }
+
+    @Test
+    public void shouldLoadOffsetsFromCheckpointFileOnInit() throws IOException 
{
+        final long expectedOffset = 10L;
+        writeCheckpointFile(Collections.singletonMap(partition, 
expectedOffset));
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+
+        assertEquals(expectedOffset, store.committedOffset(partition));
+        assertTrue(persistentStore.initialized);
+    }
+
+    @Test
+    public void shouldIgnoreCheckpointEntriesForPartitionsNotInChangelogSet() 
throws IOException {
+        final TopicPartition irrelevantPartition = new 
TopicPartition("irrelevant-topic", 0);
+        final Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
+        checkpointOffsets.put(partition, 10L);
+        checkpointOffsets.put(irrelevantPartition, 999L);
+        writeCheckpointFile(checkpointOffsets);
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+
+        assertEquals(10L, store.committedOffset(partition));
+        assertNull(store.committedOffset(irrelevantPartition));
+    }
+
+    @Test
+    public void shouldMapOffsetUnknownToNullOnInit() throws IOException {
+        writeCheckpointFile(Collections.singletonMap(partition, 
OFFSET_UNKNOWN));
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+
+        assertNull(store.committedOffset(partition));
+    }
+
+    @Test
+    public void shouldNotDeleteCheckpointFileAfterInitUnderALOS() throws 
IOException {
+        writeCheckpointFile(Collections.singletonMap(partition, 10L));
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+
+        assertTrue(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldDeleteCheckpointFileAfterInitUnderEOS() throws 
IOException {
+        writeCheckpointFile(Collections.singletonMap(partition, 10L));
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(true);
+        store.init(context, persistentStore);
+
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldThrowProcessorStateExceptionOnCorruptCheckpointFile() 
throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        final File file = storeCheckpointFile();
+        Files.write(file.toPath(), "abcdefg".getBytes());
+
+        assertThrows(ProcessorStateException.class, () -> store.init(context, 
persistentStore));
+    }
+
+    // =====================================================================
+    // managesOffsets() / committedOffset()
+    // =====================================================================
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void shouldReturnTrueForManagesOffsets() {
+        assertTrue(createStore(false).managesOffsets());
+    }
+
+    @Test
+    public void shouldReturnNullForCommittedOffsetWhenNoneCommitted() {
+        assertNull(createStore(false).committedOffset(partition));
+    }
+
+    @Test
+    public void shouldReturnCommittedOffset() {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.commit(Collections.singletonMap(partition, 42L));
+
+        assertEquals(42L, store.committedOffset(partition));
+    }
+
+    // =====================================================================
+    // commit()
+    // =====================================================================
+
+    @Test
+    public void shouldUpdateInMemoryOffsetsOnCommit() {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.commit(Collections.singletonMap(partition, 100L));
+
+        assertEquals(100L, store.committedOffset(partition));
+    }
+
+    @Test
+    public void 
shouldPreserveExistingOffsetsWhenCommittingSubsetOfPartitions() {
+        final TopicPartition partitionTwo = new 
TopicPartition(CHANGELOG_TOPIC, 1);
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store =
+            new LegacyCheckpointingStateStore<>(
+                persistentStore, false, Set.of(partition, partitionTwo), 
stateDirectory, taskId, LOG_PREFIX);
+
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.commit(Collections.singletonMap(partitionTwo, 200L));
+
+        assertEquals(100L, store.committedOffset(partition));
+        assertEquals(200L, store.committedOffset(partitionTwo));
+    }
+
+    @Test
+    public void 
shouldWriteCheckpointDuringCommitUnderALOSWhenDeltaExceedsThreshold() throws 
IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore); // sets checkpointedOffsets = {}
+
+        final long offsetBeyondThreshold = 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L;
+        store.commit(Collections.singletonMap(partition, 
offsetBeyondThreshold));
+
+        final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+        assertEquals(offsetBeyondThreshold, checkpointed.get(partition));
+    }
+
+    @Test
+    public void 
shouldNotWriteCheckpointDuringCommitUnderALOSWhenDeltaDoesNotExceedThreshold() 
throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore); // sets checkpointedOffsets = {}
+
+        store.commit(Collections.singletonMap(partition, 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT));
+
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointDuringCommitUnderEOS() throws 
IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(true);
+        store.init(context, persistentStore); // sets checkpointedOffsets = {} 
and deletes any pre-existing file
+
+        final long offsetBeyondThreshold = 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L;
+        store.commit(Collections.singletonMap(partition, 
offsetBeyondThreshold));
+
+        // Under EOS, commit() never writes the checkpoint file
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldAcceptNullOffsets() throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+        store.commit(Collections.singletonMap(partition, null));
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldCommitWhenOldOffsetIsNull() throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.init(context, persistentStore);
+
+        store.commit(Collections.singletonMap(partition, null));
+
+        final long offsetBeyondThreshold = 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L;
+        store.commit(Collections.singletonMap(partition, 
offsetBeyondThreshold));
+
+        assertTrue(storeCheckpointFile().exists());
+    }
+
+    // =====================================================================
+    // checkpoint()
+    // =====================================================================
+
+    @Test
+    public void shouldWriteCommittedOffsetsToCheckpointFile() throws 
IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.checkpoint();
+
+        final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+        assertEquals(100L, checkpointed.get(partition));
+    }
+
+    @Test
+    public void shouldWriteOffsetUnknownSentinelWhenOffsetIsNull() throws 
IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        final Map<TopicPartition, Long> nullOffset = new HashMap<>();
+        nullOffset.put(partition, null);
+        store.commit(nullOffset);
+        store.checkpoint();
+
+        final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+        assertEquals(OFFSET_UNKNOWN, (long) checkpointed.get(partition));
+    }
+
+    @Test
+    public void shouldLogWarningInsteadOfThrowingWhenCheckpointWriteFails() 
throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.commit(Collections.singletonMap(partition, 10L));
+
+        // Delete the task directory so the checkpoint write will fail with 
IOException
+        Utils.delete(stateDirectory.getOrCreateDirectoryForTask(taskId));
+
+        try (final LogCaptureAppender appender =
+                 
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+            store.checkpoint(); // should log a warning, not throw
+
+            assertThat(appender.getMessages(),
+                hasItem(containsString("Failed to write offset checkpoint 
file")));
+        }
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointWhenChangelogPartitionsIsEmpty() 
throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store =
+            new LegacyCheckpointingStateStore<>(
+                persistentStore, false, Collections.emptySet(), 
stateDirectory, taskId, LOG_PREFIX);
+
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.checkpoint();
+
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForNonPersistentStore() throws 
IOException {
+        final MockKeyValueStore nonPersistentStore = new 
MockKeyValueStore(STORE_NAME, false);
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store =
+            new LegacyCheckpointingStateStore<>(
+                nonPersistentStore, false, Set.of(partition), stateDirectory, 
taskId, LOG_PREFIX);
+
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.checkpoint();
+
+        
assertFalse(LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, 
taskId, nonPersistentStore).exists());
+    }
+
+    // =====================================================================
+    // close()
+    // =====================================================================
+
+    @Test
+    public void shouldWriteCheckpointOnClose() throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.close();
+
+        final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+        assertEquals(100L, checkpointed.get(partition));
+    }
+
+    @Test
+    public void shouldWriteCheckpointOnCloseUnderEOS() throws IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(true);
+        store.init(context, persistentStore); // deletes any pre-existing 
file; sets checkpointedOffsets
+        store.commit(Collections.singletonMap(partition, 100L)); // no file 
write under EOS
+
+        assertFalse(storeCheckpointFile().exists());
+
+        store.close(); // writes checkpoint on close even under EOS
+
+        final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+        assertEquals(100L, checkpointed.get(partition));
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointOnCloseWhenCorrupted() throws 
IOException {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.commit(Collections.singletonMap(partition, 100L));
+        store.markAsCorrupted();
+        store.close();
+
+        assertFalse(storeCheckpointFile().exists());
+    }
+
+    @Test
+    public void shouldCloseWrappedStoreOnClose() {
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        store.close();
+
+        assertTrue(persistentStore.closed);
+    }
+
+    // =====================================================================
+    // migrateLegacyOffsets()
+    // =====================================================================
+
+    @Test
+    public void shouldMigrateOffsetsFromLegacyPerTaskFileAndDeleteIt() throws 
IOException {
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        new 
OffsetCheckpoint(legacyFile).write(Collections.singletonMap(partition, 100L));
+        assertTrue(legacyFile.exists());
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        LegacyCheckpointingStateStore.migrateLegacyOffsets(
+            LOG_PREFIX, stateDirectory, taskId, 
Collections.singletonMap(partition, store));
+
+        assertFalse(legacyFile.exists());
+        assertEquals(100L, store.committedOffset(partition));
+    }
+
+    @Test
+    public void shouldMigrateOffsetsFromLegacyGlobalFileAndDeleteIt() throws 
IOException {
+        final File legacyGlobalFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, null);
+        legacyGlobalFile.getParentFile().mkdirs();
+        new 
OffsetCheckpoint(legacyGlobalFile).write(Collections.singletonMap(partition, 
200L));
+        assertTrue(legacyGlobalFile.exists());
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store =
+            new LegacyCheckpointingStateStore<>(
+                persistentStore, false, Set.of(partition), stateDirectory, 
null, LOG_PREFIX);
+        LegacyCheckpointingStateStore.migrateLegacyOffsets(
+            LOG_PREFIX, stateDirectory, null, 
Collections.singletonMap(partition, store));
+
+        assertFalse(legacyGlobalFile.exists());
+        assertEquals(200L, store.committedOffset(partition));
+    }
+
+    @Test
+    public void shouldBeNoOpForMigrationWhenNoLegacyFileExists() {
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        assertFalse(legacyFile.exists());
+
+        final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        // Should not throw
+        LegacyCheckpointingStateStore.migrateLegacyOffsets(
+            LOG_PREFIX, stateDirectory, taskId, 
Collections.singletonMap(partition, store));
+
+        assertNull(store.committedOffset(partition));
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void shouldLogWarningWhenMigratingToStoreThatDoesNotManageOffsets() 
throws IOException {
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        new 
OffsetCheckpoint(legacyFile).write(Collections.singletonMap(partition, 100L));
+
+        // Use a raw MockKeyValueStore (managesOffsets() == false, not wrapped 
in LCSS)
+        try (final LogCaptureAppender appender =
+                 
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+            LegacyCheckpointingStateStore.migrateLegacyOffsets(
+                LOG_PREFIX, stateDirectory, taskId, 
Collections.singletonMap(partition, persistentStore));
+
+            assertThat(appender.getMessages(),
+                hasItem(containsString("does not manage its own offsets")));
+        }
+    }
+
+    @Test
+    public void shouldThrowProcessorStateExceptionWhenMigrationFails() throws 
IOException {
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        new 
OffsetCheckpoint(legacyFile).write(Collections.singletonMap(partition, 100L));
+
+        final MockKeyValueStore throwingStore = new 
MockKeyValueStore(STORE_NAME, true) {
+            @Override
+            public void commit(final Map<TopicPartition, Long> 
changelogOffsets) {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+
+        assertThrows(ProcessorStateException.class, () ->
+            LegacyCheckpointingStateStore.migrateLegacyOffsets(
+                LOG_PREFIX, stateDirectory, taskId, 
Collections.singletonMap(partition, throwingStore)));
+    }
+
+    // =====================================================================
+    // Helpers
+    // =====================================================================
+
+    private LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
createStore(final boolean eosEnabled) {
+        return new LegacyCheckpointingStateStore<>(
+            persistentStore, eosEnabled, Set.of(partition), stateDirectory, 
taskId, LOG_PREFIX);
+    }
+
+    private File storeCheckpointFile() {
+        return LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, 
taskId, persistentStore);
+    }
+
+    private void writeCheckpointFile(final Map<TopicPartition, Long> offsets) 
throws IOException {
+        new OffsetCheckpoint(storeCheckpointFile()).write(offsets);
+    }
+
+    private Map<TopicPartition, Long> readCheckpointFile() throws IOException {
+        return new OffsetCheckpoint(storeCheckpointFile()).read();
+    }
+}


Reply via email to