This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7fed9d0235 KAFKA-19709: Add LegacyCheckpointingStateStore (#21554)
d7fed9d0235 is described below
commit d7fed9d02354323fa031f99b758276cef2093afa
Author: Nick Telford <[email protected]>
AuthorDate: Wed Mar 11 20:14:28 2026 +0000
KAFKA-19709: Add LegacyCheckpointingStateStore (#21554)
Extracts the handling of `.checkpoint` files, which will become a legacy
feature as part of KIP-1035, to a separate
`LegacyCheckpointingStateStore`, which will wrap any `StateStore` that
needs it.
This implementation is currently unused, as the changes that introduce
its usage will be made as part of a follow-up PR.
A core goal of this implementation is to fully encapsulate
"checkpointing", and the management of `.checkpoint` files within this
class. Consequently, there are a few behavioural differences:
1. Checkpointing is now done per-store, rather than per-task.
2. Since multiple stores can share the same `.checkpoint` file, when we
checkpoint, we have to read the current `.checkpoint` file and merge our
new offsets into it, to prevent removing the offsets from other stores.
3. Decisions about *when* to checkpoint are now taken during `commit`.
Reviewers: Bill Bejeck <[email protected]>
---
.../processor/internals/StateDirectory.java | 2 +-
.../internals/LegacyCheckpointingStateStore.java | 309 ++++++++++
.../LegacyCheckpointingStateStoreTest.java | 668 +++++++++++++++++++++
3 files changed, 978 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 5b37c1bf427..f66f177c0ef 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -474,7 +474,7 @@ public class StateDirectory implements AutoCloseable {
* @return directory for the global stores
* @throws ProcessorStateException if the global store directory does not
exists and could not be created
*/
- File globalStateDir() {
+ public File globalStateDir() {
final File dir = new File(stateDir, "global");
if (hasPersistentStores) {
if (!dir.exists() && !dir.mkdir()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
new file mode 100644
index 00000000000..a5c3a42b3c8
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
+
+public class LegacyCheckpointingStateStore<S extends StateStore, K, V> extends
WrappedStateStore<S, K, V> {
+ public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+
+ private static final Logger log =
LoggerFactory.getLogger(LegacyCheckpointingStateStore.class);
+
+ static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
+
+ private final boolean eosEnabled;
+ private final Set<TopicPartition> changelogPartitions;
+ private final StateDirectory stateDirectory;
+ private final TaskId taskId;
+ private final OffsetCheckpoint checkpointFile;
+ private final String logPrefix;
+
+ private final Map<TopicPartition, Long> offsets = new HashMap<>();
+ private Map<TopicPartition, Long> checkpointedOffsets;
+ private boolean corrupted = false;
+
+ /**
+ * Wraps the given {@link StateStore} as a {@code
LegacyCheckpointingStateStore}, only if it is both
+ * {@link StateStore#persistent() persistent}, and it does not {@link
StateStore#managesOffsets() manage its own offsets}.
+ */
+ @SuppressWarnings("deprecation")
+ public static <S extends StateStore, K, V> StateStore maybeWrapStore(final
S wrapped,
+ final
boolean eosEnabled,
+ final
Set<TopicPartition> changelogPartitions,
+ final
StateDirectory stateDirectory,
+ final
TaskId taskId,
+ final
String logPrefix) {
+ return wrapped.persistent() && !wrapped.managesOffsets()
+ ? new LegacyCheckpointingStateStore<>(wrapped, eosEnabled,
changelogPartitions, stateDirectory, taskId, logPrefix)
+ : wrapped;
+ }
+
+ /**
+ * Unwraps the given store, only if it is a {@code
LegacyCheckpointingStateStore}.
+ */
+ public static StateStore maybeUnwrapStore(final StateStore store) {
+ return (store instanceof LegacyCheckpointingStateStore<?, ?, ?>)
+ ? ((LegacyCheckpointingStateStore<?, ?, ?>) store).wrapped()
+ : store;
+ }
+
+ public static void maybeMarkCorrupted(final StateStore store) {
+ if (store instanceof LegacyCheckpointingStateStore<?, ?, ?>) {
+ ((LegacyCheckpointingStateStore<?, ?, ?>) store).markAsCorrupted();
+ }
+ }
+
+ /**
+ * Migrates offsets stored in a legacy, global/per-task .checkpoint file
into the {@code stores}.
+ *
+ * The {@code stores} <em>MUST</em> manage their own offsets (i.e. {@link
#managesOffsets()} must be {@code true}.
+ * They can either do this themselves, or be wrapped in a {@link
LegacyCheckpointingStateStore} implementation.
+ *
+ * Once this method successfully returns, the legacy {@code .checkpoint}
file for the given {@link TaskId} (or the
+ * global checkpoint, when {@code taskId} is {@code null}), will have been
migrated and deleted from the filesystem.
+ *
+ * @param logPrefix Log prefix to use for log messages.
+ * @param stateDirectory The singleton {@link StateDirectory} used for
looking up existing checkpoint files.
+ * @param taskId Either the task ID for regular stores, or {@code null} to
migrate global stores.
+ * @param stores A {@link Map} of {@link TopicPartition changelog
partitions} to their {@link StateStore}. For global
+ * stores, which may have multiple {@link TopicPartition
changelog partitions}, stores may appear
+ * multiple times, once for each of its {@link
TopicPartition changelog partitions}.
+ */
+ @SuppressWarnings("deprecation")
+ public static void migrateLegacyOffsets(final String logPrefix,
+ final StateDirectory
stateDirectory,
+ final TaskId taskId,
+ final Map<TopicPartition,
StateStore> stores) {
+ // load legacy per-task checkpoint
+ final File legacyCheckpointFile = checkpointFileFor(stateDirectory,
taskId, null);
+
+ if (legacyCheckpointFile.exists()) {
+ log.info("Migrating legacy checkpoint file for task {}", taskId);
+ final OffsetCheckpoint legacyCheckpoint = new
OffsetCheckpoint(legacyCheckpointFile);
+
+ try {
+ // build offsets for each store
+ final Map<StateStore, Map<TopicPartition, Long>>
storesToMigrate = new HashMap<>();
+ for (final Map.Entry<TopicPartition, Long> entry :
legacyCheckpoint.read().entrySet()) {
+ final StateStore store = stores.get(entry.getKey());
+ if (store != null) {
+ storesToMigrate.computeIfAbsent(store, k -> new
HashMap<>()).put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // commit checkpointed offsets to each store
+ for (final Map.Entry<StateStore, Map<TopicPartition, Long>>
entry : storesToMigrate.entrySet()) {
+ final StateStore store = entry.getKey();
+ if (!store.managesOffsets()) {
+ log.warn("{}Error migrating legacy checkpoint offsets:
StateStore '{}' does not manage its own offsets. " +
+ "The checkpointed offsets for this store will
not be migrated, and will be lost. " +
+ "This store will need to fully restore its
state on application restart. " +
+ "This is a bug in Kafka Streams, and should
never be possible.", logPrefix, store.name());
+ }
+
+ // attempt to commit the offsets, even if the store
doesn't manage them itself
+ store.commit(entry.getValue());
+ }
+
+ // delete legacy checkpoint file
+ legacyCheckpoint.delete();
+
+ log.info("Migrated legacy checkpoint file for task {} with
offsets migrated for {} stores", taskId, storesToMigrate.size());
+ } catch (final IOException | RuntimeException e) {
+ throw new ProcessorStateException(String.format("%sError
migrating checkpoint file for task '%s'", logPrefix, taskId), e);
+ }
+ } else {
+ log.debug("No legacy checkpoint file found for task {}", taskId);
+ }
+ }
+
+ LegacyCheckpointingStateStore(final S wrapped,
+ final boolean eosEnabled,
+ final Set<TopicPartition>
changelogPartitions,
+ final StateDirectory stateDirectory,
+ final TaskId taskId,
+ final String logPrefix) {
+ super(wrapped);
+ this.eosEnabled = eosEnabled;
+ this.changelogPartitions = changelogPartitions;
+ this.stateDirectory = stateDirectory;
+ this.taskId = taskId;
+ this.checkpointFile = new
OffsetCheckpoint(checkpointFileFor(stateDirectory, taskId, this));
+ this.logPrefix = logPrefix;
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ // load store offsets from checkpoint file
+ try {
+ final Map<TopicPartition, Long> allOffsets = checkpointFile.read();
+ for (final Map.Entry<TopicPartition, Long> entry :
allOffsets.entrySet()) {
+ if (changelogPartitions.contains(entry.getKey())) {
+ offsets.put(entry.getKey(),
changelogOffsetFromCheckpointedOffset(entry.getValue()));
+ }
+ }
+ checkpointedOffsets = new HashMap<>(offsets);
+ } catch (final IOException | RuntimeException e) {
+ throw new ProcessorStateException(String.format("%sError loading
checkpoint file when creating StateStore '%s'", logPrefix, name()), e);
+ }
+
+ // initialize the actual store
+ super.init(stateStoreContext, root);
+
+ // under EOS, we delete the checkpoint file after everything has been
loaded to ensure state is wiped after a crash
+ try {
+ if (eosEnabled) {
+ checkpointFile.delete();
+ }
+ } catch (final IOException e) {
+ throw new ProcessorStateException(String.format("%sError deleting
checkpoint file when creating StateStore '%s'", logPrefix, name()), e);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public boolean managesOffsets() {
+ return true;
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return offsets.get(partition);
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ super.commit(changelogOffsets);
+
+ // update in-memory offsets
+ offsets.putAll(changelogOffsets);
+
+ // only write the checkpoint file if both:
+ // 1. in ALOS mode (under EOS, the checkpoint file is only written
when closing the store)
+ // 2. we have written enough new data to the store to warrant updating
the checkpoint (prevents disk thrashing)
+ if (!eosEnabled && checkpointNeeded(checkpointedOffsets, offsets)) {
+ checkpoint();
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ if (!corrupted) {
+ checkpoint();
+ }
+ }
+
+ public void markAsCorrupted() {
+ corrupted = true;
+ }
+
+ /**
+ * "checkpoint" committed offsets to disk.
+ */
+ void checkpoint() {
+ // only checkpoint persistent and logged stores
+ if (persistent() && !changelogPartitions.isEmpty()) {
+ try {
+ // merge new checkpoint offsets into checkpoint file
+ final Map<TopicPartition, Long> checkpointingOffsets = new
HashMap<>(offsets.size());
+ for (final Map.Entry<TopicPartition, Long> entry :
offsets.entrySet()) {
+ checkpointingOffsets.put(entry.getKey(),
checkpointableOffsetFromChangelogOffset(entry.getValue()));
+ }
+
+ log.debug("Writing checkpoint: {} for task {}",
checkpointingOffsets, taskId);
+ checkpointFile.write(checkpointingOffsets);
+ checkpointedOffsets = new HashMap<>(offsets);
+ } catch (final IOException e) {
+ log.warn("{}Failed to write offset checkpoint file to [{}]." +
+ " This may occur if OS cleaned the state.dir
in case when it located in ${java.io.tmpdir} directory." +
+ " This may also occur due to running multiple
instances on the same machine using the same state dir." +
+ " Changing the location of state.dir may
resolve the problem.",
+ logPrefix, checkpointFile, e);
+ }
+ }
+ }
+
+ static File checkpointFileFor(final StateDirectory stateDirectory,
+ final TaskId taskId,
+ final StateStore store) {
+ return taskId == null ?
+ // global store
+ (store == null ?
+ // legacy, global file
+ new File(stateDirectory.globalStateDir(),
CHECKPOINT_FILE_NAME) :
+ // per-store file
+ new File(stateDirectory.globalStateDir(),
CHECKPOINT_FILE_NAME + "_" + store.name())
+ ) :
+ (store == null ?
+ // legacy, per-task file
+ new
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME) :
+ // per-store file
+ new
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME +
"_" + store.name())
+ );
+ }
+
+ static boolean checkpointNeeded(final Map<TopicPartition, Long>
oldOffsetSnapshot,
+ final Map<TopicPartition, Long>
newOffsetSnapshot) {
+ // we should always have the old snapshot post completing the register
state stores;
+ // if it is null it means the registration is not done and hence we
should not overwrite the checkpoint
+ if (oldOffsetSnapshot == null) {
+ return false;
+ }
+
+ // we can checkpoint if the difference between the current and the
previous snapshot is large enough
+ long totalOffsetDelta = 0L;
+ for (final Map.Entry<TopicPartition, Long> entry :
newOffsetSnapshot.entrySet()) {
+ final Long newOffset = entry.getValue();
+ if (newOffset != null) {
+ final Long oldOffset = oldOffsetSnapshot.get(entry.getKey());
+ totalOffsetDelta += newOffset - (oldOffset == null ? 0L :
oldOffset);
+ }
+ }
+
+ // when enforcing checkpoint is required, we should overwrite the
checkpoint if it is different from the old one;
+ // otherwise, we only overwrite the checkpoint if it is largely
different from the old one
+ return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
+ }
+
+ // Pass in a sentinel value to checkpoint when the changelog offset is not
yet initialized/known
+ private static long checkpointableOffsetFromChangelogOffset(final Long
offset) {
+ return offset != null ? offset : OFFSET_UNKNOWN;
+ }
+
+ // Convert the written offsets in the checkpoint file back to the
changelog offset
+ private static Long changelogOffsetFromCheckpointedOffset(final long
offset) {
+ return offset != OFFSET_UNKNOWN ? offset : null;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
new file mode 100644
index 00000000000..493e437e2c7
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.test.MockKeyValueStore;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME;
+import static
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
+import static
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link LegacyCheckpointingStateStore}.
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class LegacyCheckpointingStateStoreTest {
+
+ private static final String APPLICATION_ID = "test-application";
+ private static final String LOG_PREFIX = "test-prefix ";
+ private static final String STORE_NAME = "test-store";
+ private static final String CHANGELOG_TOPIC = "test-topic";
+
+ private final TopicPartition partition = new
TopicPartition(CHANGELOG_TOPIC, 0);
+ private final TaskId taskId = new TaskId(0, 0);
+
+ private File baseDir;
+ private StateDirectory stateDirectory;
+ private MockKeyValueStore persistentStore;
+
+ @Mock
+ private StateStoreContext context;
+
+ @BeforeEach
+ public void setUp() {
+ baseDir = TestUtils.tempDirectory();
+ stateDirectory = new StateDirectory(new StreamsConfig(new Properties()
{
+ {
+ put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+ put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
+ }
+ }), new MockTime(), true, true);
+ persistentStore = new MockKeyValueStore(STORE_NAME, true);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ Utils.delete(baseDir);
+ }
+
+ // =====================================================================
+ // maybeWrapStore()
+ // =====================================================================
+
+ @Test
+ public void shouldWrapPersistentNonOffsetManagingStore() {
+ final StateStore result = LegacyCheckpointingStateStore.maybeWrapStore(
+ persistentStore, false, Set.of(partition), stateDirectory, taskId,
LOG_PREFIX);
+
+ assertThat(result, instanceOf(LegacyCheckpointingStateStore.class));
+ }
+
+ @Test
+ public void shouldNotWrapNonPersistentStore() {
+ final MockKeyValueStore nonPersistentStore = new
MockKeyValueStore(STORE_NAME, false);
+
+ final StateStore result = LegacyCheckpointingStateStore.maybeWrapStore(
+ nonPersistentStore, false, Set.of(partition), stateDirectory,
taskId, LOG_PREFIX);
+
+ assertThat(result,
not(instanceOf(LegacyCheckpointingStateStore.class)));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldNotWrapOffsetManagingStore() {
+ final MockKeyValueStore offsetManagingStore = new
MockKeyValueStore(STORE_NAME, true) {
+ @Override
+ public boolean managesOffsets() {
+ return true;
+ }
+ };
+
+ final StateStore result = LegacyCheckpointingStateStore.maybeWrapStore(
+ offsetManagingStore, false, Set.of(partition), stateDirectory,
taskId, LOG_PREFIX);
+
+ assertThat(result,
not(instanceOf(LegacyCheckpointingStateStore.class)));
+ }
+
+ // =====================================================================
+ // maybeUnwrapStore()
+ // =====================================================================
+
+ @Test
+ public void shouldUnwrapLCSSToInnerStore() {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
lcss = createStore(false);
+
+ final StateStore result =
LegacyCheckpointingStateStore.maybeUnwrapStore(lcss);
+
+ assertEquals(persistentStore, result);
+ }
+
+ @Test
+ public void shouldReturnNonLCSSStoreUnchangedFromMaybeUnwrap() {
+ final StateStore result =
LegacyCheckpointingStateStore.maybeUnwrapStore(persistentStore);
+
+ assertEquals(persistentStore, result);
+ }
+
+ // =====================================================================
+ // maybeMarkCorrupted()
+ // =====================================================================
+
+ @Test
+ public void shouldMarkLCSSAsCorruptedSoCloseDoesNotWriteCheckpoint()
throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+
+ LegacyCheckpointingStateStore.maybeMarkCorrupted(store);
+ // commit() won't write checkpoint either: checkpointedOffsets is null
-> checkpointNeeded returns false
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.close();
+
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldBeNoOpForNonLCSSStoreInMaybeMarkCorrupted() {
+ // Should not throw
+ LegacyCheckpointingStateStore.maybeMarkCorrupted(persistentStore);
+ }
+
+ // =====================================================================
+ // checkpointFileFor() — static helper
+ // =====================================================================
+
+ @Test
+ public void shouldReturnPerStoreCheckpointFileForTask() {
+ final File result =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId,
persistentStore);
+ final File expected = new File(
+ stateDirectory.getOrCreateDirectoryForTask(taskId),
+ CHECKPOINT_FILE_NAME + "_" + STORE_NAME
+ );
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void shouldReturnLegacyPerTaskCheckpointFile() {
+ final File result =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ final File expected = new File(
+ stateDirectory.getOrCreateDirectoryForTask(taskId),
+ CHECKPOINT_FILE_NAME
+ );
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void shouldReturnPerStoreCheckpointFileForGlobalStore() {
+ // globalStateDir() = new File(new File(baseDir, APPLICATION_ID),
"global")
+ final File expectedGlobalDir = new File(new File(baseDir,
APPLICATION_ID), "global");
+ final File expected = new File(expectedGlobalDir, CHECKPOINT_FILE_NAME
+ "_" + STORE_NAME);
+
+ final File result =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null,
persistentStore);
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void shouldReturnLegacyGlobalCheckpointFile() {
+ final File expectedGlobalDir = new File(new File(baseDir,
APPLICATION_ID), "global");
+ final File expected = new File(expectedGlobalDir,
CHECKPOINT_FILE_NAME);
+
+ final File result =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, null);
+
+ assertEquals(expected, result);
+ }
+
+ // =====================================================================
+ // checkpointNeeded() — static helper
+ // =====================================================================
+
+ @Test
+ public void shouldReturnFalseFromCheckpointNeededWhenOldSnapshotIsNull() {
+ assertFalse(LegacyCheckpointingStateStore.checkpointNeeded(
+ null, Collections.singletonMap(partition, 100L)));
+ }
+
+ @Test
+ public void
shouldReturnFalseFromCheckpointNeededWhenDeltaEqualsThreshold() {
+ // delta == threshold → NOT greater than → no checkpoint needed
+ final Map<TopicPartition, Long> oldOffsets = Collections.emptyMap();
+ final Map<TopicPartition, Long> newOffsets = Collections.singletonMap(
+ partition, OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT);
+
+ assertFalse(LegacyCheckpointingStateStore.checkpointNeeded(oldOffsets,
newOffsets));
+ }
+
+ @Test
+ public void shouldReturnFalseFromCheckpointNeededWhenDeltaBelowThreshold()
{
+ final Map<TopicPartition, Long> oldOffsets = Collections.emptyMap();
+ final Map<TopicPartition, Long> newOffsets = Collections.singletonMap(
+ partition, OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT / 2);
+
+ assertFalse(LegacyCheckpointingStateStore.checkpointNeeded(oldOffsets,
newOffsets));
+ }
+
+ @Test
+ public void
shouldReturnTrueFromCheckpointNeededWhenDeltaExceedsThreshold() {
+ final Map<TopicPartition, Long> oldOffsets = Collections.emptyMap();
+ final Map<TopicPartition, Long> newOffsets = Collections.singletonMap(
+ partition, OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L);
+
+ assertTrue(LegacyCheckpointingStateStore.checkpointNeeded(oldOffsets,
newOffsets));
+ }
+
+ // =====================================================================
+ // init()
+ // =====================================================================
+
+ @Test
+ public void shouldSucceedOnInitWhenNoCheckpointFileExists() {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+
+ assertNull(store.committedOffset(partition));
+ assertTrue(persistentStore.initialized);
+ }
+
+ @Test
+ public void shouldLoadOffsetsFromCheckpointFileOnInit() throws IOException
{
+ final long expectedOffset = 10L;
+ writeCheckpointFile(Collections.singletonMap(partition,
expectedOffset));
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+
+ assertEquals(expectedOffset, store.committedOffset(partition));
+ assertTrue(persistentStore.initialized);
+ }
+
+ @Test
+ public void shouldIgnoreCheckpointEntriesForPartitionsNotInChangelogSet()
throws IOException {
+ final TopicPartition irrelevantPartition = new
TopicPartition("irrelevant-topic", 0);
+ final Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
+ checkpointOffsets.put(partition, 10L);
+ checkpointOffsets.put(irrelevantPartition, 999L);
+ writeCheckpointFile(checkpointOffsets);
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+
+ assertEquals(10L, store.committedOffset(partition));
+ assertNull(store.committedOffset(irrelevantPartition));
+ }
+
+ @Test
+ public void shouldMapOffsetUnknownToNullOnInit() throws IOException {
+ writeCheckpointFile(Collections.singletonMap(partition,
OFFSET_UNKNOWN));
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+
+ assertNull(store.committedOffset(partition));
+ }
+
+ @Test
+ public void shouldNotDeleteCheckpointFileAfterInitUnderALOS() throws
IOException {
+ writeCheckpointFile(Collections.singletonMap(partition, 10L));
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+
+ assertTrue(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldDeleteCheckpointFileAfterInitUnderEOS() throws
IOException {
+ writeCheckpointFile(Collections.singletonMap(partition, 10L));
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(true);
+ store.init(context, persistentStore);
+
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldThrowProcessorStateExceptionOnCorruptCheckpointFile()
throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ final File file = storeCheckpointFile();
+ Files.write(file.toPath(), "abcdefg".getBytes());
+
+ assertThrows(ProcessorStateException.class, () -> store.init(context,
persistentStore));
+ }
+
+ // =====================================================================
+ // managesOffsets() / committedOffset()
+ // =====================================================================
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldReturnTrueForManagesOffsets() {
+ assertTrue(createStore(false).managesOffsets());
+ }
+
+ @Test
+ public void shouldReturnNullForCommittedOffsetWhenNoneCommitted() {
+ assertNull(createStore(false).committedOffset(partition));
+ }
+
+ @Test
+ public void shouldReturnCommittedOffset() {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.commit(Collections.singletonMap(partition, 42L));
+
+ assertEquals(42L, store.committedOffset(partition));
+ }
+
+ // =====================================================================
+ // commit()
+ // =====================================================================
+
+ @Test
+ public void shouldUpdateInMemoryOffsetsOnCommit() {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.commit(Collections.singletonMap(partition, 100L));
+
+ assertEquals(100L, store.committedOffset(partition));
+ }
+
+ @Test
+ public void
shouldPreserveExistingOffsetsWhenCommittingSubsetOfPartitions() {
+ final TopicPartition partitionTwo = new
TopicPartition(CHANGELOG_TOPIC, 1);
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store =
+ new LegacyCheckpointingStateStore<>(
+ persistentStore, false, Set.of(partition, partitionTwo),
stateDirectory, taskId, LOG_PREFIX);
+
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.commit(Collections.singletonMap(partitionTwo, 200L));
+
+ assertEquals(100L, store.committedOffset(partition));
+ assertEquals(200L, store.committedOffset(partitionTwo));
+ }
+
+ @Test
+ public void
shouldWriteCheckpointDuringCommitUnderALOSWhenDeltaExceedsThreshold() throws
IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore); // sets checkpointedOffsets = {}
+
+ final long offsetBeyondThreshold =
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L;
+ store.commit(Collections.singletonMap(partition,
offsetBeyondThreshold));
+
+ final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+ assertEquals(offsetBeyondThreshold, checkpointed.get(partition));
+ }
+
+ @Test
+ public void
shouldNotWriteCheckpointDuringCommitUnderALOSWhenDeltaDoesNotExceedThreshold()
throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore); // sets checkpointedOffsets = {}
+
+ store.commit(Collections.singletonMap(partition,
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT));
+
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldNotWriteCheckpointDuringCommitUnderEOS() throws
IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(true);
+ store.init(context, persistentStore); // sets checkpointedOffsets = {}
and deletes any pre-existing file
+
+ final long offsetBeyondThreshold =
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L;
+ store.commit(Collections.singletonMap(partition,
offsetBeyondThreshold));
+
+ // Under EOS, commit() never writes the checkpoint file
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldAcceptNullOffsets() throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+ store.commit(Collections.singletonMap(partition, null));
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldCommitWhenOldOffsetIsNull() throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.init(context, persistentStore);
+
+ store.commit(Collections.singletonMap(partition, null));
+
+ final long offsetBeyondThreshold =
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT + 1L;
+ store.commit(Collections.singletonMap(partition,
offsetBeyondThreshold));
+
+ assertTrue(storeCheckpointFile().exists());
+ }
+
+ // =====================================================================
+ // checkpoint()
+ // =====================================================================
+
+ @Test
+ public void shouldWriteCommittedOffsetsToCheckpointFile() throws
IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.checkpoint();
+
+ final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+ assertEquals(100L, checkpointed.get(partition));
+ }
+
+ @Test
+ public void shouldWriteOffsetUnknownSentinelWhenOffsetIsNull() throws
IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ final Map<TopicPartition, Long> nullOffset = new HashMap<>();
+ nullOffset.put(partition, null);
+ store.commit(nullOffset);
+ store.checkpoint();
+
+ final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+ assertEquals(OFFSET_UNKNOWN, (long) checkpointed.get(partition));
+ }
+
+ @Test
+ public void shouldLogWarningInsteadOfThrowingWhenCheckpointWriteFails()
throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.commit(Collections.singletonMap(partition, 10L));
+
+ // Delete the task directory so the checkpoint write will fail with
IOException
+ Utils.delete(stateDirectory.getOrCreateDirectoryForTask(taskId));
+
+ try (final LogCaptureAppender appender =
+
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+ store.checkpoint(); // should log a warning, not throw
+
+ assertThat(appender.getMessages(),
+ hasItem(containsString("Failed to write offset checkpoint
file")));
+ }
+ }
+
+ @Test
+ public void shouldNotWriteCheckpointWhenChangelogPartitionsIsEmpty()
throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store =
+ new LegacyCheckpointingStateStore<>(
+ persistentStore, false, Collections.emptySet(),
stateDirectory, taskId, LOG_PREFIX);
+
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.checkpoint();
+
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldNotWriteCheckpointForNonPersistentStore() throws
IOException {
+ final MockKeyValueStore nonPersistentStore = new
MockKeyValueStore(STORE_NAME, false);
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store =
+ new LegacyCheckpointingStateStore<>(
+ nonPersistentStore, false, Set.of(partition), stateDirectory,
taskId, LOG_PREFIX);
+
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.checkpoint();
+
+
assertFalse(LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory,
taskId, nonPersistentStore).exists());
+ }
+
+ // =====================================================================
+ // close()
+ // =====================================================================
+
+ @Test
+ public void shouldWriteCheckpointOnClose() throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.close();
+
+ final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+ assertEquals(100L, checkpointed.get(partition));
+ }
+
+ @Test
+ public void shouldWriteCheckpointOnCloseUnderEOS() throws IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(true);
+ store.init(context, persistentStore); // deletes any pre-existing
file; sets checkpointedOffsets
+ store.commit(Collections.singletonMap(partition, 100L)); // no file
write under EOS
+
+ assertFalse(storeCheckpointFile().exists());
+
+ store.close(); // writes checkpoint on close even under EOS
+
+ final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
+ assertEquals(100L, checkpointed.get(partition));
+ }
+
+ @Test
+ public void shouldNotWriteCheckpointOnCloseWhenCorrupted() throws
IOException {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.commit(Collections.singletonMap(partition, 100L));
+ store.markAsCorrupted();
+ store.close();
+
+ assertFalse(storeCheckpointFile().exists());
+ }
+
+ @Test
+ public void shouldCloseWrappedStoreOnClose() {
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ store.close();
+
+ assertTrue(persistentStore.closed);
+ }
+
+ // =====================================================================
+ // migrateLegacyOffsets()
+ // =====================================================================
+
+ @Test
+ public void shouldMigrateOffsetsFromLegacyPerTaskFileAndDeleteIt() throws
IOException {
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ new
OffsetCheckpoint(legacyFile).write(Collections.singletonMap(partition, 100L));
+ assertTrue(legacyFile.exists());
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(
+ LOG_PREFIX, stateDirectory, taskId,
Collections.singletonMap(partition, store));
+
+ assertFalse(legacyFile.exists());
+ assertEquals(100L, store.committedOffset(partition));
+ }
+
+ @Test
+ public void shouldMigrateOffsetsFromLegacyGlobalFileAndDeleteIt() throws
IOException {
+ final File legacyGlobalFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, null);
+ legacyGlobalFile.getParentFile().mkdirs();
+ new
OffsetCheckpoint(legacyGlobalFile).write(Collections.singletonMap(partition,
200L));
+ assertTrue(legacyGlobalFile.exists());
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store =
+ new LegacyCheckpointingStateStore<>(
+ persistentStore, false, Set.of(partition), stateDirectory,
null, LOG_PREFIX);
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(
+ LOG_PREFIX, stateDirectory, null,
Collections.singletonMap(partition, store));
+
+ assertFalse(legacyGlobalFile.exists());
+ assertEquals(200L, store.committedOffset(partition));
+ }
+
+ @Test
+ public void shouldBeNoOpForMigrationWhenNoLegacyFileExists() {
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ assertFalse(legacyFile.exists());
+
+ final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ // Should not throw
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(
+ LOG_PREFIX, stateDirectory, taskId,
Collections.singletonMap(partition, store));
+
+ assertNull(store.committedOffset(partition));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldLogWarningWhenMigratingToStoreThatDoesNotManageOffsets()
throws IOException {
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ new
OffsetCheckpoint(legacyFile).write(Collections.singletonMap(partition, 100L));
+
+ // Use a raw MockKeyValueStore (managesOffsets() == false, not wrapped
in LCSS)
+ try (final LogCaptureAppender appender =
+
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(
+ LOG_PREFIX, stateDirectory, taskId,
Collections.singletonMap(partition, persistentStore));
+
+ assertThat(appender.getMessages(),
+ hasItem(containsString("does not manage its own offsets")));
+ }
+ }
+
+ @Test
+ public void shouldThrowProcessorStateExceptionWhenMigrationFails() throws
IOException {
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ new
OffsetCheckpoint(legacyFile).write(Collections.singletonMap(partition, 100L));
+
+ final MockKeyValueStore throwingStore = new
MockKeyValueStore(STORE_NAME, true) {
+ @Override
+ public void commit(final Map<TopicPartition, Long>
changelogOffsets) {
+ throw new RuntimeException("KABOOM!");
+ }
+ };
+
+ assertThrows(ProcessorStateException.class, () ->
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(
+ LOG_PREFIX, stateDirectory, taskId,
Collections.singletonMap(partition, throwingStore)));
+ }
+
+ // =====================================================================
+ // Helpers
+ // =====================================================================
+
+ private LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
createStore(final boolean eosEnabled) {
+ return new LegacyCheckpointingStateStore<>(
+ persistentStore, eosEnabled, Set.of(partition), stateDirectory,
taskId, LOG_PREFIX);
+ }
+
+ private File storeCheckpointFile() {
+ return LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory,
taskId, persistentStore);
+ }
+
+ private void writeCheckpointFile(final Map<TopicPartition, Long> offsets)
throws IOException {
+ new OffsetCheckpoint(storeCheckpointFile()).write(offsets);
+ }
+
+ private Map<TopicPartition, Long> readCheckpointFile() throws IOException {
+ return new OffsetCheckpoint(storeCheckpointFile()).read();
+ }
+}