This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6c83a29a5e3 KAFKA-20489: Add enable.transactional.statestores (#22141)
6c83a29a5e3 is described below
commit 6c83a29a5e39dda753166042eb9ad5ea658c3fa2
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 15 22:19:18 2026 +0100
KAFKA-20489: Add enable.transactional.statestores (#22141)
Introduces the `enable.transactional.statestores` config (default
false). When enabled, uncommitted writes are held in an in-memory buffer
per store and are not flushed to the underlying base store until the
Kafka transaction commits, making staged writes invisible to IQ reads at
the committed isolation level until the containing commit completes.
The config is threaded through TopologyConfig, TaskConfig, task
creators, and ProcessorStateManager. When transactional stores are
active, the EOS state wipe on unclean shutdown is suppressed: since
uncommitted data never reaches the base store, there is nothing to wipe;
corruption is handled explicitly via markChangelogAsCorrupted.
StreamTask.postCommit is extended to flush the pending write buffer on
every commit interval, not only on task revocation or close. Under EOS
the normal commit-interval path previously skipped maybeCheckpoint, the
only path that calls stateMgr.commit() to flush each store's buffer.
Without this, READ_COMMITTED IQ readers see no new data mid-run, and the
uncommitted buffer grows unbounded between task lifecycle events.
Reviewers: Bill Bejeck <[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 13 +++++
.../org/apache/kafka/streams/TopologyConfig.java | 12 ++++-
.../processor/internals/ActiveTaskCreator.java | 1 +
.../processor/internals/ProcessorStateManager.java | 14 +++--
.../streams/processor/internals/StandbyTask.java | 3 ++
.../processor/internals/StandbyTaskCreator.java | 1 +
.../processor/internals/StateManagerUtil.java | 8 ++-
.../streams/processor/internals/StreamTask.java | 5 +-
.../streams/processor/internals/TaskManager.java | 1 +
.../apache/kafka/streams/StreamsConfigTest.java | 13 +++++
.../internals/ProcessorStateManagerTest.java | 62 ++++++++++++++++++++++
.../processor/internals/StandbyTaskTest.java | 23 ++++++++
.../processor/internals/StateManagerUtilTest.java | 47 +++++++++++++---
.../processor/internals/StreamTaskTest.java | 45 +++++++++++++++-
.../processor/internals/TaskManagerTest.java | 10 ++--
.../StreamThreadStateStoreProviderTest.java | 1 +
.../apache/kafka/streams/TopologyTestDriver.java | 1 +
17 files changed, 240 insertions(+), 20 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a1c06e15221..b56b97407ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -575,6 +575,14 @@ public class StreamsConfig extends AbstractConfig {
/** {@code enable.metrics.push} */
@SuppressWarnings("WeakerAccess")
public static final String ENABLE_METRICS_PUSH_CONFIG =
CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
+
+ /** {@code enable.transactional.statestores} */
+ public static final String TRANSACTIONAL_STATE_STORES_CONFIG =
"enable.transactional.statestores";
+ private static final String TRANSACTIONAL_STATE_STORES_DOC = "Whether to
enable transactional state stores. " +
+ "When enabled, state stores will buffer writes in a transaction
buffer (if supported by the state store implementation), " +
+ "before committing them when the corresponding Kafka changelog
transaction has committed. \n" +
+ "Under EOS, state stores will no longer be wiped on-error and
rebuilt from scratch. " +
+ "In the event of an error (under either EOS or ALOS), only the
writes since the last successful commit will be lost and replayed through the
topology.";
@Deprecated
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable
pushing of internal client metrics for (main, restore, and global) consumers,
producers, and admin clients." +
" The cluster must have a client metrics subscription which
corresponds to a client.";
@@ -1117,6 +1125,11 @@ public class StreamsConfig extends AbstractConfig {
true,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
+ .define(TRANSACTIONAL_STATE_STORES_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ TRANSACTIONAL_STATE_STORES_DOC)
.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
Type.INT,
null,
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index fd76f07686a..45c0653c84b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -195,6 +195,7 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<ProcessingExceptionHandler>
processingExceptionHandlerSupplier;
public final boolean ensureExplicitInternalResourceNaming;
+ public final boolean transactionalStateStoresEnabled;
public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
@@ -306,6 +307,9 @@ public final class TopologyConfig extends AbstractConfig {
}
ensureExplicitInternalResourceNaming =
globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
+ this.transactionalStateStoresEnabled = Boolean.parseBoolean(
+ String.valueOf(globalAppConfigs.originals()
+ .getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG,
"false")));
}
@Deprecated
@@ -349,7 +353,8 @@ public final class TopologyConfig extends AbstractConfig {
timestampExtractorSupplier.get(),
deserializationExceptionHandlerSupplier.get(),
processingExceptionHandlerSupplier.get(),
- eosEnabled
+ eosEnabled,
+ transactionalStateStoresEnabled
);
}
@@ -361,6 +366,7 @@ public final class TopologyConfig extends AbstractConfig {
public final DeserializationExceptionHandler
deserializationExceptionHandler;
public final ProcessingExceptionHandler processingExceptionHandler;
public final boolean eosEnabled;
+ public final boolean transactionalStateStoresEnabled;
private TaskConfig(final long maxTaskIdleMs,
final long taskTimeoutMs,
@@ -368,7 +374,8 @@ public final class TopologyConfig extends AbstractConfig {
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler
deserializationExceptionHandler,
final ProcessingExceptionHandler
processingExceptionHandler,
- final boolean eosEnabled) {
+ final boolean eosEnabled,
+ final boolean transactionalStateStoresEnabled) {
this.maxTaskIdleMs = maxTaskIdleMs;
this.taskTimeoutMs = taskTimeoutMs;
this.maxBufferedSize = maxBufferedSize;
@@ -376,6 +383,7 @@ public final class TopologyConfig extends AbstractConfig {
this.deserializationExceptionHandler =
deserializationExceptionHandler;
this.processingExceptionHandler = processingExceptionHandler;
this.eosEnabled = eosEnabled;
+ this.transactionalStateStoresEnabled =
transactionalStateStoresEnabled;
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 96590df0793..5e1a69d29e0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -149,6 +149,7 @@ class ActiveTaskCreator {
taskId,
Task.TaskType.ACTIVE,
eosEnabled(applicationConfig),
+
applicationConfig.getBoolean(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG),
logContext,
stateDirectory,
topology.storeToChangelogTopic(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f25d5268751..d6a5a2fea3c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -196,6 +196,7 @@ public class ProcessorStateManager implements StateManager {
private final TaskId taskId;
private final boolean eosEnabled;
+ private final boolean transactionalStateStoresEnabled;
private final Collection<TopicPartition> sourcePartitions;
private final Map<String, String> storeToChangelogTopic;
@@ -225,6 +226,7 @@ public class ProcessorStateManager implements StateManager {
public ProcessorStateManager(final TaskId taskId,
final TaskType taskType,
final boolean eosEnabled,
+ final boolean transactionalStateStoresEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String>
storeToChangelogTopic,
@@ -236,6 +238,7 @@ public class ProcessorStateManager implements StateManager {
this.taskId = taskId;
this.taskType = taskType;
this.eosEnabled = eosEnabled;
+ this.transactionalStateStoresEnabled = transactionalStateStoresEnabled;
this.sourcePartitions = sourcePartitions;
this.upgradeFrom = upgradeFrom;
@@ -253,11 +256,12 @@ public class ProcessorStateManager implements
StateManager {
public ProcessorStateManager(final TaskId taskId,
final TaskType taskType,
final boolean eosEnabled,
+ final boolean transactionalStateStoresEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String>
storeToChangelogTopic,
final Collection<TopicPartition>
sourcePartitions) throws ProcessorStateException {
- this(taskId, taskType, eosEnabled, logContext, stateDirectory,
storeToChangelogTopic, sourcePartitions, null);
+ this(taskId, taskType, eosEnabled, transactionalStateStoresEnabled,
logContext, stateDirectory, storeToChangelogTopic, sourcePartitions, null);
}
/**
@@ -271,7 +275,7 @@ public class ProcessorStateManager implements StateManager {
final
StateDirectory stateDirectory,
final
Map<String, String> storeToChangelogTopic,
final
Set<TopicPartition> sourcePartitions) {
- return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
+ return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
false, logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
}
void registerStateStores(final List<StateStore> allStores, final
InternalProcessorContext<?, ?> processorContext) {
@@ -337,7 +341,7 @@ public class ProcessorStateManager implements StateManager {
// with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
// and hence we are uncertain that the current local state
only contains committed data;
// in that case we need to treat it as a task-corrupted
exception
- if (eosEnabled && !storeDirIsEmpty) {
+ if (eosEnabled && !storeDirIsEmpty &&
!transactionalStateStoresEnabled) {
log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
"since under EOS it has the risk of getting
uncommitted data in stores we have to " +
"treat it as a task corruption error and wipe
out the local state of task {} " +
@@ -419,6 +423,10 @@ public class ProcessorStateManager implements StateManager
{
return Collections.unmodifiableSet(changelogOffsets().keySet());
}
+ boolean hasCorruptedStores() {
+ return stores.values().stream().anyMatch(m -> m.corrupted);
+ }
+
void markChangelogAsCorrupted(final Collection<TopicPartition> partitions)
{
final Collection<TopicPartition> partitionsToMarkAsCorrupted = new
LinkedList<>(partitions);
for (final StateStoreMetadata storeMetadata : stores.values()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 46015532ba5..1b8f23dae98 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -45,6 +45,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
*/
public class StandbyTask extends AbstractTask implements Task {
private final boolean eosEnabled;
+ private final boolean transactionalStateStoresEnabled;
private final Sensor closeTaskSensor;
private final Sensor updateSensor;
private final StreamsMetricsImpl streamsMetrics;
@@ -86,6 +87,7 @@ public class StandbyTask extends AbstractTask implements Task
{
closeTaskSensor =
ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor =
TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(),
streamsMetrics);
this.eosEnabled = config.eosEnabled;
+ this.transactionalStateStoresEnabled =
config.transactionalStateStoresEnabled;
}
@Override
@@ -271,6 +273,7 @@ public class StandbyTask extends AbstractTask implements
Task {
logPrefix,
clean,
eosEnabled,
+ transactionalStateStoresEnabled,
stateMgr,
stateDirectory,
TaskType.STANDBY
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index e990d588fdd..9083ff054e2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -82,6 +82,7 @@ class StandbyTaskCreator {
taskId,
Task.TaskType.STANDBY,
eosEnabled(applicationConfig),
+
applicationConfig.getBoolean(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG),
getLogContext(taskId),
stateDirectory,
topology.storeToChangelogTopic(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 6a02fba3cde..42c0da1c519 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -158,11 +158,15 @@ final class StateManagerUtil {
final String logPrefix,
final boolean closeClean,
final boolean eosEnabled,
+ final boolean
transactionalStateStoresEnabled,
final ProcessorStateManager stateMgr,
final StateDirectory stateDirectory,
final TaskType taskType) {
- // if EOS is enabled, wipe out the whole state store for unclean close
since it is now invalid
- final boolean wipeStateStore = !closeClean && eosEnabled;
+ // if EOS is enabled, wipe out the whole state store for unclean close
since it is now invalid.
+ // With transactional state stores, uncommitted data is never written
to the base store,
+ // so wiping is only needed when stores have been marked as corrupted
(e.g. InvalidOffsetException).
+ final boolean wipeStateStore = !closeClean && eosEnabled
+ && (!transactionalStateStoresEnabled ||
stateMgr.hasCorruptedStores());
final TaskId id = stateMgr.taskId();
log.trace("Closing state manager for {} task {}", taskType, id);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 495098f2a06..3b65e677674 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -80,6 +80,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// there's still an optimization that requires this info to be
// leaked into this class, which is to checkpoint after committing if EOS
is not enabled.
private final boolean eosEnabled;
+ private final boolean transactionalStateStoresEnabled;
private final int maxBufferedSize;
private final AbstractPartitionGroup partitionGroup;
@@ -155,6 +156,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
this.time = time;
this.recordCollector = recordCollector;
this.eosEnabled = config.eosEnabled;
+ this.transactionalStateStoresEnabled =
config.transactionalStateStoresEnabled;
final String threadId = Thread.currentThread().getName();
this.streamsMetrics = streamsMetrics;
@@ -526,7 +528,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
break;
case RUNNING:
- if (enforceCheckpoint || !eosEnabled) {
+ if (enforceCheckpoint || !eosEnabled ||
transactionalStateStoresEnabled) {
maybeCheckpoint();
}
log.debug("Finalized commit for {} task with eos {} enforce
checkpoint {}", state(), eosEnabled, enforceCheckpoint);
@@ -670,6 +672,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
logPrefix,
clean,
eosEnabled,
+ transactionalStateStoresEnabled,
stateMgr,
stateDirectory,
TaskType.ACTIVE
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e8a3442ec13..337e70492c0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -280,6 +280,7 @@ public class TaskManager {
// we need to enforce a checkpoint that removes the
corrupted partitions
if (markAsCorrupted) {
+
task.markChangelogAsCorrupted(task.changelogPartitions());
task.postCommit(true);
}
} catch (final RuntimeException swallow) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index fa54bb6c78c..c8f358d9fe6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -83,6 +83,7 @@ import static
org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFF
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
+import static
org.apache.kafka.streams.StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
@@ -1913,6 +1914,18 @@ public class StreamsConfigTest {
}
}
+ @Test
+ public void shouldDisableTransactionalStateStoresByDefault() {
+
assertFalse(streamsConfig.getBoolean(TRANSACTIONAL_STATE_STORES_CONFIG));
+ }
+
+ @Test
+ public void shouldEnableTransactionalStateStoresWhenConfigured() {
+ props.put(TRANSACTIONAL_STATE_STORES_CONFIG, true);
+ streamsConfig = new StreamsConfig(props);
+
assertTrue(streamsConfig.getBoolean(TRANSACTIONAL_STATE_STORES_CONFIG));
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean
isKey) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 63e7e7c2e23..3e697138ad4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -203,6 +203,7 @@ public class ProcessorStateManagerTest {
taskId,
Task.TaskType.STANDBY,
false,
+ false,
logContext,
stateDirectory,
mkMap(
@@ -223,6 +224,7 @@ public class ProcessorStateManagerTest {
taskId,
Task.TaskType.STANDBY,
false,
+ false,
logContext,
stateDirectory,
mkMap(
@@ -398,6 +400,7 @@ public class ProcessorStateManagerTest {
taskId,
Task.TaskType.ACTIVE,
false,
+ false,
logContext,
stateDirectory,
emptyMap(),
@@ -683,6 +686,7 @@ public class ProcessorStateManagerTest {
taskId,
Task.TaskType.STANDBY,
false,
+ false,
logContext,
stateDirectory,
emptyMap(),
@@ -1261,11 +1265,69 @@ public class ProcessorStateManagerTest {
assertEquals(200L, written.get(persistentStoreTwoPartition));
}
+ @Test
+ public void shouldReportHasCorruptedStores() throws IOException {
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
+ try {
+ stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
+ assertFalse(stateMgr.hasCorruptedStores());
+
stateMgr.markChangelogAsCorrupted(Collections.singleton(persistentStorePartition));
+ assertTrue(stateMgr.hasCorruptedStores());
+ } finally {
+ stateMgr.close();
+ }
+ }
+
+ @Test
+ public void
shouldNotThrowTaskCorruptedWithoutCheckpointAndNonEmptyDirWhenTransactional()
throws IOException {
+ // With transactional state stores + EOS, a missing checkpoint on a
non-empty store dir should NOT
+ // be treated as corruption — uncommitted data is never written to the
base store.
+ final long checkpointOffset = 10L;
+
+ final Map<TopicPartition, Long> offsets = mkMap(
+ mkEntry(persistentStorePartition, checkpointOffset),
+ mkEntry(nonPersistentStorePartition, checkpointOffset),
+ mkEntry(irrelevantPartition, 999L)
+ );
+ checkpoint.write(offsets);
+
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, true, null);
+
+ try {
+ stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
+ stateMgr.registerStore(persistentStoreTwo,
persistentStoreTwo.stateRestoreCallback, null);
+ stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
+
+ // should not throw TaskCorruptedException
+ stateMgr.initializeStoreOffsets(false);
+ } finally {
+ stateMgr.close();
+ }
+ }
+
+ private ProcessorStateManager getStateManager(final Task.TaskType
taskType, final boolean eosEnabled, final boolean
transactionalStateStoresEnabled, final UpgradeFromValues upgradeFrom) {
+ return new ProcessorStateManager(
+ taskId,
+ taskType,
+ eosEnabled,
+ transactionalStateStoresEnabled,
+ logContext,
+ stateDirectory,
+ mkMap(
+ mkEntry(persistentStoreName, persistentStoreTopicName),
+ mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
+ mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
+ ),
+ emptySet(),
+ upgradeFrom);
+ }
+
private ProcessorStateManager getStateManager(final Task.TaskType
taskType, final boolean eosEnabled, final UpgradeFromValues upgradeFrom) {
return new ProcessorStateManager(
taskId,
taskType,
eosEnabled,
+ false,
logContext,
stateDirectory,
mkMap(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8cd65e7aa2d..84d1f218cfc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -390,6 +390,29 @@ public class StandbyTaskTest {
assertEquals(Task.State.CLOSED, task.state());
}
+ @Test
+ public void
shouldNotWipeStateDirOnDirtyCloseWithEosAndTransactionalStateStores() {
+ doNothing().when(stateManager).close();
+ when(stateManager.hasCorruptedStores()).thenReturn(false);
+
+ config = new StreamsConfig(mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2),
+ mkEntry(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, "true")
+ )));
+
+ task = createStandbyTask();
+
+ task.suspend();
+ task.closeDirty();
+
+ assertEquals(Task.State.CLOSED, task.state());
+ // With transactional state stores, the state dir should NOT be wiped
on dirty close
+ // unless stores are specifically marked as corrupted.
+ verify(stateManager, never()).baseDir();
+ }
+
@Test
public void shouldPrepareRecycleSuspendedTask() {
task = createStandbyTask();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 415f6dc52a1..65b779247ee 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -121,7 +121,7 @@ public class StateManagerUtilTest {
when(stateDirectory.lock(taskId)).thenReturn(true);
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
+ "logPrefix:", true, false, false, stateManager, stateDirectory,
TaskType.ACTIVE);
inOrder.verify(stateManager).close();
inOrder.verify(stateDirectory).unlock(taskId);
@@ -136,7 +136,7 @@ public class StateManagerUtilTest {
final ProcessorStateException thrown = assertThrows(
ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE));
+ "logPrefix:", true, false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
@@ -154,7 +154,7 @@ public class StateManagerUtilTest {
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
- logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
+ logger, "logPrefix:", false, false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
verify(stateDirectory).unlock(taskId);
}
@@ -168,7 +168,7 @@ public class StateManagerUtilTest {
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
+ "logPrefix:", false, true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
inOrder.verify(stateManager).close();
inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
@@ -187,7 +187,7 @@ public class StateManagerUtilTest {
try (MockedStatic<Utils> ignored = mockStatic(Utils.class)) {
assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, false, stateManager, stateDirectory, TaskType.ACTIVE));
}
verify(stateDirectory).unlock(taskId);
@@ -206,7 +206,7 @@ public class StateManagerUtilTest {
final ProcessorStateException thrown = assertThrows(
ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE));
+ "logPrefix:", false, true, false, stateManager,
stateDirectory, TaskType.ACTIVE));
assertEquals(IOException.class, thrown.getCause().getClass());
}
@@ -224,7 +224,7 @@ public class StateManagerUtilTest {
when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager,
stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, false, stateManager,
stateDirectory, TaskType.ACTIVE);
inOrder.verify(stateManager).taskId();
inOrder.verify(stateDirectory).lock(taskId);
@@ -241,7 +241,7 @@ public class StateManagerUtilTest {
when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", false, true, false, stateManager,
stateDirectory, TaskType.ACTIVE);
inOrder.verify(stateManager).taskId();
inOrder.verify(stateDirectory).lock(taskId);
@@ -250,4 +250,35 @@ public class StateManagerUtilTest {
verify(stateDirectory, never()).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
+
+ @Test
+ public void
testCloseStateManagerTransactionalDoesNotWipeWhenNoCorruptedStores() {
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.hasCorruptedStores()).thenReturn(false);
+
+ StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, true, stateManager, stateDirectory,
TaskType.ACTIVE);
+
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verify(stateDirectory, never()).removeTaskOffsets(taskId);
+ }
+
+ @Test
+ public void
testCloseStateManagerTransactionalWipesWhenStoresAreCorrupted() {
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.hasCorruptedStores()).thenReturn(true);
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
+
+ StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, true, stateManager, stateDirectory,
TaskType.ACTIVE);
+
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
+ inOrder.verify(stateDirectory).unlock(taskId);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f466493d52e..48639fad16a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -307,6 +307,16 @@ public class StreamTaskTest {
final Class<? extends DeserializationExceptionHandler>
deserializationExceptionHandler,
final Class<? extends ProcessingExceptionHandler>
processingExceptionHandler,
final Class<? extends TimestampExtractor> timestampExtractor) {
+ return createConfig(eosConfig, enforcedProcessingValue,
deserializationExceptionHandler, processingExceptionHandler,
timestampExtractor, false);
+ }
+
+ private static StreamsConfig createConfig(
+ final String eosConfig,
+ final String enforcedProcessingValue,
+ final Class<? extends DeserializationExceptionHandler>
deserializationExceptionHandler,
+ final Class<? extends ProcessingExceptionHandler>
processingExceptionHandler,
+ final Class<? extends TimestampExtractor> timestampExtractor,
+ final boolean transactionalStateStores) {
final String canonicalPath;
try {
canonicalPath = BASE_DIR.getCanonicalPath();
@@ -324,7 +334,8 @@ public class StreamTaskTest {
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
enforcedProcessingValue),
mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
deserializationExceptionHandler.getName()),
mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
processingExceptionHandler.getName()),
- mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
timestampExtractor.getName())
+ mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
timestampExtractor.getName()),
+ mkEntry(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG,
String.valueOf(transactionalStateStores))
)));
}
@@ -2910,6 +2921,38 @@ public class StreamTaskTest {
verify(recordCollector, never()).offsets();
}
+ @Test
+ public void
shouldNotCheckpointOnPostCommitInRunningStateWithEosAndNotEnforced() {
+ final ProcessorStateManager processorStateManager = mockStateManager();
+ recordCollector = mock(RecordCollectorImpl.class);
+
+ task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true,
processorStateManager);
+ task.initializeIfNeeded();
+ // completeRestoration does not call commit() under EOS (verified by
shouldNotCommitAfterRestorationWhenExactlyOnceEnabled)
+ task.completeRestoration(noOpResetter -> { });
+ task.postCommit(false);
+ // total commit() invocations should remain 0
+ verify(processorStateManager, never()).commit();
+ }
+
+ @Test
+ public void
shouldCheckpointOnPostCommitInRunningStateWithEosAndTransactionalStateStores() {
+ final ProcessorStateManager processorStateManager = mockStateManager();
+ recordCollector = mock(RecordCollectorImpl.class);
+
+ task = createStatefulTask(
+ createConfig(EXACTLY_ONCE_V2, "100",
LogAndFailExceptionHandler.class, LogAndFailProcessingExceptionHandler.class,
FailOnInvalidTimestamp.class, true),
+ true,
+ processorStateManager
+ );
+ task.initializeIfNeeded();
+ // completeRestoration does not call commit() under EOS
+ task.completeRestoration(noOpResetter -> { });
+ task.postCommit(false);
+ // transactionalStateStoresEnabled=true triggers maybeCheckpoint() on
postCommit even under EOS
+ verify(processorStateManager).commit();
+ }
+
@Test
public void
punctuateShouldNotHandleFailProcessingExceptionAndThrowStreamsException() {
when(stateManager.taskId()).thenReturn(taskId);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index baee5286cae..74dd97050ee 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2471,6 +2471,7 @@ public class TaskManagerTest {
when(corruptedActive.prepareCommit(false)).thenReturn(emptyMap());
when(corruptedActive.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
doNothing().when(corruptedActive).suspend();
+
doNothing().when(corruptedActive).markChangelogAsCorrupted(taskId00ChangelogPartitions);
doNothing().when(corruptedActive).postCommit(true);
doNothing().when(corruptedActive).closeDirty();
doNothing().when(corruptedActive).revive();
@@ -2481,21 +2482,24 @@ public class TaskManagerTest {
taskManager.handleCorruption(singleton(taskId00));
- // 1. verify corrupted task was closed dirty and revived
+ // 1. verify corrupted task was closed dirty and revived;
markChangelogAsCorrupted precedes postCommit
final InOrder corruptedOrder = inOrder(corruptedActive, tasks);
corruptedOrder.verify(corruptedActive).prepareCommit(false);
corruptedOrder.verify(corruptedActive).suspend();
+
corruptedOrder.verify(corruptedActive).markChangelogAsCorrupted(taskId00ChangelogPartitions);
corruptedOrder.verify(corruptedActive).postCommit(true);
corruptedOrder.verify(corruptedActive).closeDirty();
corruptedOrder.verify(tasks).removeTask(corruptedActive);
corruptedOrder.verify(corruptedActive).revive();
corruptedOrder.verify(tasks).addPendingTasksToInit(Set.of(corruptedActive));
- // 2. verify uncorrupted task attempted commit, failed with timeout,
then was closed dirty and revived
+ // 2. verify uncorrupted task attempted commit, failed with timeout;
EOS converts TimeoutException to
+ // TaskCorruptedException so it also ends up in the corrupted path
(markAsCorrupted=true)
final InOrder uncorruptedOrder = inOrder(uncorruptedActive, producer,
tasks);
uncorruptedOrder.verify(uncorruptedActive).prepareCommit(true);
- uncorruptedOrder.verify(producer).commitTransaction(offsets,
groupMetadata); // tries to commit, throws TimeoutException
+ uncorruptedOrder.verify(producer).commitTransaction(offsets,
groupMetadata); // throws TimeoutException → TaskCorruptedException
uncorruptedOrder.verify(uncorruptedActive).suspend();
+
uncorruptedOrder.verify(uncorruptedActive).markChangelogAsCorrupted(taskId01ChangelogPartitions);
uncorruptedOrder.verify(uncorruptedActive).postCommit(true);
uncorruptedOrder.verify(uncorruptedActive).closeDirty();
uncorruptedOrder.verify(tasks).removeTask(uncorruptedActive);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 1eaf63d19c8..6bbe268b34f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -655,6 +655,7 @@ public class StreamThreadStateStoreProviderTest {
taskId,
Task.TaskType.ACTIVE,
StreamsConfigUtils.eosEnabled(streamsConfig),
+ false,
logContext,
stateDirectory,
topology.storeToChangelogTopic(),
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 687825b1d10..2738458062a 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -486,6 +486,7 @@ public class TopologyTestDriver implements Closeable {
TASK_ID,
Task.TaskType.ACTIVE,
StreamsConfig.EXACTLY_ONCE_V2.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+
streamsConfig.getBoolean(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG),
logContext,
stateDirectory,
processorTopology.storeToChangelogTopic(),