This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee4debb9f0d KAFKA-19128: Kafka Streams should not get offsets when 
close dirty (#19450)
ee4debb9f0d is described below

commit ee4debb9f0dc2749b317654f2e5bcf7bdab35884
Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com>
AuthorDate: Fri Apr 25 06:23:07 2025 +0200

    KAFKA-19128: Kafka Streams should not get offsets when close dirty (#19450)
    
    Kafka Streams calls `prepareCommit()` in `Taskmanager#closeTaskDirty()`.
    However, the dirty task must not get committed and therefore,
    prepare-commit tasks such as getting offsets should not be needed as
    well. The only thing needed before closing a task dirty is flushing.
    Therefore, separating `flush` and `prepareCommit` could be a good fix.
    
    Reviewers: Bill Bejeck <b...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../streams/processor/internals/ReadOnlyTask.java  |  2 +-
 .../streams/processor/internals/StandbyTask.java   | 10 ++-
 .../streams/processor/internals/StreamTask.java    |  7 +-
 .../kafka/streams/processor/internals/Task.java    |  2 +-
 .../streams/processor/internals/TaskExecutor.java  |  2 +-
 .../streams/processor/internals/TaskManager.java   | 12 ++--
 .../processor/internals/StandbyTaskTest.java       | 14 ++--
 .../processor/internals/StreamTaskTest.java        | 82 +++++++++++++---------
 .../processor/internals/TaskManagerTest.java       | 49 +++++++------
 .../internals/tasks/DefaultTaskExecutorTest.java   |  2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  6 +-
 11 files changed, 107 insertions(+), 81 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index a895b71e4e9..dd5a2c6e1d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -180,7 +180,7 @@ public class ReadOnlyTask implements Task {
     }
 
     @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean 
clean) {
         throw new UnsupportedOperationException("This task is read-only");
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 417f754ca2c..4c6e6674bdb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -179,7 +179,7 @@ public class StandbyTask extends AbstractTask implements 
Task {
      *                          or flushing state store get IO errors; such 
error should cause the thread to die
      */
     @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean 
clean) {
         switch (state()) {
             case CREATED:
                 log.debug("Skipped preparing created task for commit");
@@ -189,7 +189,11 @@ public class StandbyTask extends AbstractTask implements 
Task {
             case RUNNING:
             case SUSPENDED:
                 // do not need to flush state store caches in pre-commit since 
nothing would be sent for standby tasks
-                log.debug("Prepared {} task for committing", state());
+                if (!clean) {
+                    log.debug("Skipped preparing {} standby task with id {} 
for commit since the task is getting closed dirty.", state(), id);
+                } else {
+                    log.debug("Prepared {} task for committing", state());
+                }
 
                 break;
 
@@ -197,7 +201,7 @@ public class StandbyTask extends AbstractTask implements 
Task {
                 throw new IllegalStateException("Illegal state " + state() + " 
while preparing standby task " + id + " for committing ");
         }
 
-        return Collections.emptyMap();
+        return clean ? Collections.emptyMap() : null;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 424d6f7af61..93737d82289 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -417,7 +417,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         timeCurrentIdlingStarted = Optional.empty();
     }
 
-
     public void flush() {
         stateMgr.flushCache();
         recordCollector.flush();
@@ -429,7 +428,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
      * @return offsets that should be committed for this task
      */
     @Override
-    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean 
clean) {
         switch (state()) {
             case CREATED:
             case RESTORING:
@@ -444,6 +443,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                     //
                     // TODO: this should be removed after we decouple caching 
with emitting
                     flush();
+                    if (!clean) {
+                        log.debug("Skipped preparing {} task with id {} for 
commit since the task is getting closed dirty.", state(), id);
+                        return null;
+                    }
                     hasPendingTxCommit = eosEnabled;
 
                     log.debug("Prepared {} task for committing", state());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 484c1ca574b..ba09700af8a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -201,7 +201,7 @@ public interface Task {
     /**
      * @throws StreamsException fatal error, should close the thread
      */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean);
 
     void postCommit(boolean enforceCheckpoint);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index c993787503e..91deab0dd9d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -142,7 +142,7 @@ public class TaskExecutor {
         for (final Task task : tasksToCommit) {
             // we need to call commitNeeded first since we need to update 
committable offsets
             if (task.commitNeeded()) {
-                final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata 
= task.prepareCommit();
+                final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata 
= task.prepareCommit(true);
                 if (!offsetAndMetadata.isEmpty()) {
                     consumedOffsetsAndMetadata.put(task, offsetAndMetadata);
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index eccf0c8f33d..9376e6887f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -278,7 +278,7 @@ public class TaskManager {
                     // we do not need to take the returned offsets since we 
are not going to commit anyways;
                     // this call is only used for active tasks to flush the 
cache before suspending and
                     // closing the topology
-                    task.prepareCommit();
+                    task.prepareCommit(false);
                 } catch (final RuntimeException swallow) {
                     log.warn("Error flushing cache for corrupted task {}. " +
                         "Since the task is closing dirty, the following 
exception is swallowed: {}",
@@ -812,7 +812,7 @@ public class TaskManager {
                 //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
                 // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
                 //    write the checkpoint file.
-                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit(true);
                 if (!offsets.isEmpty()) {
                     log.error("Task {} should have been committed when it was 
suspended, but it reports non-empty " +
                                     "offsets {} to commit; this means it 
failed during last commit and hence should be closed dirty",
@@ -1264,7 +1264,7 @@ public class TaskManager {
                                                  final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
         for (final Task task : tasksToPrepare) {
             try {
-                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit(true);
                 if (!committableOffsets.isEmpty()) {
                     consumedOffsetsPerTask.put(task, committableOffsets);
                 }
@@ -1479,7 +1479,7 @@ public class TaskManager {
         try {
             // we call this function only to flush the case if necessary
             // before suspending and closing the topology
-            task.prepareCommit();
+            task.prepareCommit(false);
         } catch (final RuntimeException swallow) {
             log.warn("Error flushing cache of dirty task {}. " +
                 "Since the task is closing dirty, the following exception is 
swallowed: {}",
@@ -1630,7 +1630,7 @@ public class TaskManager {
         // first committing all tasks and then suspend and close them clean
         for (final Task task : activeTasksToClose) {
             try {
-                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit(true);
                 tasksToCommit.add(task);
                 if (!committableOffsets.isEmpty()) {
                     consumedOffsetsAndMetadataPerTask.put(task, 
committableOffsets);
@@ -1719,7 +1719,7 @@ public class TaskManager {
         // first committing and then suspend / close clean
         for (final Task task : standbyTasksToClose) {
             try {
-                task.prepareCommit();
+                task.prepareCommit(true);
                 task.postCommit(true);
                 task.suspend();
                 closeTaskClean(task);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e953a61fc1f..768f3787d0b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -213,7 +213,7 @@ public class StandbyTaskTest {
         task.suspend();
         task.closeClean();
 
-        assertThrows(IllegalStateException.class, task::prepareCommit);
+        assertThrows(IllegalStateException.class, () -> 
task.prepareCommit(true));
     }
 
     @Test
@@ -261,13 +261,13 @@ public class StandbyTaskTest {
 
         task = createStandbyTask();
         task.initializeIfNeeded();
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false);  // this should not checkpoint
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false);  // this should checkpoint
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false);  // this should not checkpoint
 
         verify(stateManager).checkpoint();
@@ -322,7 +322,7 @@ public class StandbyTaskTest {
         task = createStandbyTask();
         task.initializeIfNeeded();
         task.suspend();
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(true);
         task.closeClean();
 
@@ -360,7 +360,7 @@ public class StandbyTaskTest {
         // could commit if the offset advanced beyond threshold
         assertTrue(task.commitNeeded());
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(true);
     }
 
@@ -389,7 +389,7 @@ public class StandbyTaskTest {
         task = createStandbyTask();
         task.initializeIfNeeded();
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertThrows(RuntimeException.class, () -> task.postCommit(true));
 
         assertEquals(RUNNING, task.state());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bcf24ee7df8..98807cd6342 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -645,6 +645,22 @@ public class StreamTaskTest {
         assertEquals(asList(201, 202, 203), source2.values);
     }
 
+    @Test
+    public void shouldNotGetOffsetsIfPrepareCommitDirty() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatefulTask(createConfig("100"), false);
+
+        task.addRecords(partition1, 
List.of(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+        task.addRecords(partition2, 
List.of(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
+
+        assertTrue(task.process(0L));
+        assertTrue(task.commitNeeded());
+
+        // committableOffsetsAndMetadata() has not been called, otherwise 
prepareCommit() would have returned a map
+        assertNull(task.prepareCommit(false));
+    }
+
     @Test
     public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
         when(stateManager.taskId()).thenReturn(taskId);
@@ -660,7 +676,7 @@ public class StreamTaskTest {
         ));
 
         assertTrue(task.process(time.milliseconds()));
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertTrue(task.process(time.milliseconds()));
         task.postCommit(false);
         assertTrue(task.process(time.milliseconds()));
@@ -683,7 +699,7 @@ public class StreamTaskTest {
         ));
 
         assertTrue(task.process(time.milliseconds()));
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertFalse(task.process(time.milliseconds()));
         task.postCommit(false);
         assertTrue(task.process(time.milliseconds()));
@@ -1328,7 +1344,7 @@ public class StreamTaskTest {
         assertTrue(task.process(0L));
         assertTrue(task.commitNeeded());
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertTrue(task.commitNeeded());
 
         task.postCommit(true);
@@ -1338,7 +1354,7 @@ public class StreamTaskTest {
         assertTrue(task.maybePunctuateStreamTime());
         assertTrue(task.commitNeeded());
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertTrue(task.commitNeeded());
 
         task.postCommit(true);
@@ -1349,7 +1365,7 @@ public class StreamTaskTest {
         assertTrue(task.maybePunctuateSystemTime());
         assertTrue(task.commitNeeded());
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertTrue(task.commitNeeded());
 
         task.postCommit(true);
@@ -1374,7 +1390,7 @@ public class StreamTaskTest {
         task.process(0L);
         processorSystemTime.mockProcessor.addProcessorMetadata("key2", 200L);
 
-        final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = 
task.prepareCommit();
+        final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = 
task.prepareCommit(true);
         final TopicPartitionMetadata expected = new TopicPartitionMetadata(3L,
             new ProcessorMetadata(
                 mkMap(
@@ -1413,7 +1429,7 @@ public class StreamTaskTest {
         final TopicPartitionMetadata metadata = new TopicPartitionMetadata(0, 
new ProcessorMetadata());
 
         assertTrue(task.commitNeeded());
-        assertThat(task.prepareCommit(), equalTo(
+        assertThat(task.prepareCommit(true), equalTo(
                 mkMap(
                         mkEntry(partition1, new OffsetAndMetadata(3L, 
Optional.of(2), metadata.encode()))
                 )
@@ -1430,7 +1446,7 @@ public class StreamTaskTest {
         task.process(0L);
 
         assertTrue(task.commitNeeded());
-        assertThat(task.prepareCommit(), equalTo(
+        assertThat(task.prepareCommit(true), equalTo(
             mkMap(
                 mkEntry(partition1, new OffsetAndMetadata(3L, Optional.of(2), 
metadata.encode())),
                 mkEntry(partition2, new OffsetAndMetadata(1L, Optional.of(0), 
metadata.encode()))
@@ -1486,7 +1502,7 @@ public class StreamTaskTest {
 
         assertTrue(task.commitNeeded());
 
-        assertThat(task.prepareCommit(), equalTo(
+        assertThat(task.prepareCommit(true), equalTo(
             mkMap(
                 mkEntry(partition1, new OffsetAndMetadata(1L,  Optional.of(1), 
expectedMetadata1.encode())),
                 mkEntry(partition2, new OffsetAndMetadata(2L, Optional.of(1), 
expectedMetadata2.encode()))
@@ -1509,7 +1525,7 @@ public class StreamTaskTest {
         assertTrue(task.commitNeeded());
 
         // Processor metadata not updated, we just need to commit to 
partition1 again with new offset
-        assertThat(task.prepareCommit(), equalTo(
+        assertThat(task.prepareCommit(true), equalTo(
                 mkMap(mkEntry(partition1, new OffsetAndMetadata(2L, 
Optional.of(1), expectedMetadata3.encode())))
         ));
         task.postCommit(false);
@@ -1526,7 +1542,7 @@ public class StreamTaskTest {
 
         final IllegalStateException thrown = assertThrows(
             IllegalStateException.class,
-            task::prepareCommit
+            () -> task.prepareCommit(true)
         );
 
         assertThat(thrown.getMessage(), is("Illegal state CLOSED while 
preparing active task 0_0 for committing"));
@@ -1820,10 +1836,10 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { }); // should checkpoint
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(true); // should checkpoint
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false); // should not checkpoint
 
         assertThat("Map was empty", task.highWaterMark().size() == 2);
@@ -1847,10 +1863,10 @@ public class StreamTaskTest {
 
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { }); // should checkpoint
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(true); // should checkpoint
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false); // should checkpoint since the offset delta is 
greater than the threshold
 
         assertThat("Map was empty", task.highWaterMark().size() == 2);
@@ -1866,7 +1882,7 @@ public class StreamTaskTest {
 
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false);
         final File checkpointFile = new File(
             stateDirectory.getOrCreateDirectoryForTask(taskId),
@@ -2011,7 +2027,7 @@ public class StreamTaskTest {
         assertTrue(task.process(0L));
         assertTrue(task.process(0L));
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         if (doCommit) {
             task.updateCommittedOffsets(repartition, 10L);
         }
@@ -2050,7 +2066,7 @@ public class StreamTaskTest {
 
         task.transitionTo(SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
-        assertThrows(IllegalStateException.class, task::prepareCommit);
+        assertThrows(IllegalStateException.class, () -> 
task.prepareCommit(true));
     }
 
     @Test
@@ -2101,7 +2117,7 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false);
 
         task.suspend();
@@ -2123,7 +2139,7 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { }); // should checkpoint
 
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false); // should checkpoint since the offset delta is 
greater than the threshold
 
         task.suspend();
@@ -2207,7 +2223,7 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { }); // should flush and 
checkpoint
         task.suspend();
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(true); // should flush and checkpoint
         task.closeClean();
 
@@ -2277,7 +2293,7 @@ public class StreamTaskTest {
         assertTrue(task.commitNeeded());
 
         task.suspend();
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(false);
 
         assertEquals(SUSPENDED, task.state());
@@ -2307,7 +2323,7 @@ public class StreamTaskTest {
         assertTrue(task.commitNeeded());
 
         task.suspend();
-        task.prepareCommit();
+        task.prepareCommit(true);
         task.postCommit(true); // should checkpoint
         assertThrows(ProcessorStateException.class, () -> task.closeClean());
 
@@ -2336,7 +2352,7 @@ public class StreamTaskTest {
         task.addRecords(partition1, 
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
         task.process(100L);
 
-        assertThrows(ProcessorStateException.class, task::prepareCommit);
+        assertThrows(ProcessorStateException.class, () -> 
task.prepareCommit(true));
 
         assertEquals(RUNNING, task.state());
 
@@ -2369,7 +2385,7 @@ public class StreamTaskTest {
         assertTrue(task.commitNeeded());
 
         task.suspend();
-        task.prepareCommit();
+        task.prepareCommit(true);
         assertThrows(ProcessorStateException.class, () -> 
task.postCommit(true));
 
         assertEquals(Task.State.SUSPENDED, task.state());
@@ -2672,7 +2688,7 @@ public class StreamTaskTest {
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1,
                 new OffsetAndMetadata(offset + 1,
                     new TopicPartitionMetadata(RecordQueue.UNKNOWN, new 
ProcessorMetadata()).encode()))))
@@ -2704,7 +2720,7 @@ public class StreamTaskTest {
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 
1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))))
         );
     }
@@ -2734,14 +2750,14 @@ public class StreamTaskTest {
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new 
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
         );
 
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new 
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
         );
     }
@@ -2771,7 +2787,7 @@ public class StreamTaskTest {
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1,
                 new OffsetAndMetadata(offset + 1,
                     new TopicPartitionMetadata(RecordQueue.UNKNOWN, new 
ProcessorMetadata()).encode()))))
@@ -2803,7 +2819,7 @@ public class StreamTaskTest {
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 
1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))))
         );
     }
@@ -2834,14 +2850,14 @@ public class StreamTaskTest {
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new 
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
         );
 
         assertTrue(task.process(offset));
         assertTrue(task.commitNeeded());
         assertThat(
-            task.prepareCommit(),
+            task.prepareCommit(true),
             equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new 
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 9d7df53adbe..d8bb35c000a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -459,7 +459,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
-        verify(activeTaskToClose).prepareCommit();
+        verify(activeTaskToClose).prepareCommit(false);
         verify(activeTaskToClose).suspend();
         verify(activeTaskToClose).closeDirty();
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
@@ -500,7 +500,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
-        verify(standbyTaskToClose).prepareCommit();
+        verify(standbyTaskToClose).prepareCommit(false);
         verify(standbyTaskToClose).suspend();
         verify(standbyTaskToClose).closeDirty();
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
@@ -996,7 +996,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
-        verify(activeTaskToRecycle).prepareCommit();
+        verify(activeTaskToRecycle).prepareCommit(true);
         verify(tasks).addPendingTasksToInit(Set.of(standbyTask));
         verify(tasks).removeTask(activeTaskToRecycle);
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
@@ -1019,7 +1019,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
-        verify(activeTaskToRecycle).prepareCommit();
+        verify(activeTaskToRecycle).prepareCommit(true);
         verify(tasks).replaceActiveWithStandby(standbyTask);
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@@ -1059,7 +1059,7 @@ public class TaskManagerTest {
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
-        verify(activeTaskToClose).prepareCommit();
+        verify(activeTaskToClose).prepareCommit(true);
         verify(activeTaskToClose).closeClean();
         verify(tasks).removeTask(activeTaskToClose);
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@@ -1536,10 +1536,10 @@ public class TaskManagerTest {
 
         taskManager.handleLostAll();
 
-        verify(task1).prepareCommit();
+        verify(task1).prepareCommit(false);
         verify(task1).suspend();
         verify(task1).closeDirty();
-        verify(task2).prepareCommit();
+        verify(task2).prepareCommit(false);
         verify(task2).suspend();
         verify(task2).closeDirty();
     }
@@ -1569,7 +1569,7 @@ public class TaskManagerTest {
 
         verify(task1).suspend();
         verify(task1).closeClean();
-        verify(task2).prepareCommit();
+        verify(task2).prepareCommit(false);
         verify(task2).suspend();
         verify(task2).closeDirty();
         verify(task3).suspend();
@@ -2386,10 +2386,10 @@ public class TaskManagerTest {
         taskManager.handleCorruption(Set.of(taskId02));
 
         verify(activeRestoringTask, never()).commitNeeded();
-        verify(activeRestoringTask, never()).prepareCommit();
+        verify(activeRestoringTask, never()).prepareCommit(true);
         verify(activeRestoringTask, never()).postCommit(anyBoolean());
         verify(standbyTask, never()).commitNeeded();
-        verify(standbyTask, never()).prepareCommit();
+        verify(standbyTask, never()).prepareCommit(true);
         verify(standbyTask, never()).postCommit(anyBoolean());
     }
 
@@ -2418,9 +2418,9 @@ public class TaskManagerTest {
         taskManager.handleCorruption(Set.of(taskId02));
 
         verify(activeRestoringTask, never()).commitNeeded();
-        verify(activeRestoringTask, never()).prepareCommit();
+        verify(activeRestoringTask, never()).prepareCommit(true);
         verify(activeRestoringTask, never()).postCommit(anyBoolean());
-        verify(standbyTask).prepareCommit();
+        verify(standbyTask).prepareCommit(true);
         verify(standbyTask).postCommit(anyBoolean());
     }
 
@@ -2431,7 +2431,7 @@ public class TaskManagerTest {
         final StateMachineTask corruptedStandby = new 
StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
         final StateMachineTask runningNonCorruptedActive = new 
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
             @Override
-            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+            public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
                 throw new TaskMigratedException("You dropped out of the 
group!", new RuntimeException());
             }
         };
@@ -3394,7 +3394,7 @@ public class TaskManagerTest {
 
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager) {
             @Override
-            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+            public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
                 throw new RuntimeException("task 0_1 prepare commit boom!");
             }
         };
@@ -3560,7 +3560,7 @@ public class TaskManagerTest {
 
         verify(activeTaskCreator).close();
         verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
-        verify(failedStatefulTask).prepareCommit();
+        verify(failedStatefulTask).prepareCommit(false);
         verify(failedStatefulTask).suspend();
         verify(failedStatefulTask).closeDirty();
     }
@@ -3634,16 +3634,16 @@ public class TaskManagerTest {
         verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
         verify(tasks).addTask(removedStatefulTask);
         verify(tasks).addTask(removedStandbyTask);
-        verify(removedFailedStatefulTask).prepareCommit();
+        verify(removedFailedStatefulTask).prepareCommit(false);
         verify(removedFailedStatefulTask).suspend();
         verify(removedFailedStatefulTask).closeDirty();
-        verify(removedFailedStandbyTask).prepareCommit();
+        verify(removedFailedStandbyTask).prepareCommit(false);
         verify(removedFailedStandbyTask).suspend();
         verify(removedFailedStandbyTask).closeDirty();
-        verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
+        verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(false);
         verify(removedFailedStatefulTaskDuringRemoval).suspend();
         verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
-        verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
+        verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(false);
         verify(removedFailedStandbyTaskDuringRemoval).suspend();
         verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
     }
@@ -3869,7 +3869,7 @@ public class TaskManagerTest {
     public void shouldPropagateExceptionFromActiveCommit() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
             @Override
-            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+            public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
                 throw new RuntimeException("opsh.");
             }
         };
@@ -3893,7 +3893,7 @@ public class TaskManagerTest {
     public void shouldPropagateExceptionFromStandbyCommit() {
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager) {
             @Override
-            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+            public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
                 throw new RuntimeException("opsh.");
             }
         };
@@ -4689,7 +4689,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTask = mock(StandbyTask.class);
         when(standbyTask.id()).thenReturn(taskId00);
         when(standbyTask.isActive()).thenReturn(false);
-        when(standbyTask.prepareCommit()).thenReturn(Collections.emptyMap());
+        
when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap());
 
         final StreamTask activeTask = mock(StreamTask.class);
         when(activeTask.id()).thenReturn(taskId00);
@@ -4939,10 +4939,13 @@ public class TaskManagerTest {
         }
 
         @Override
-        public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
             commitPrepared = true;
 
             if (commitNeeded) {
+                if (!clean) {
+                    return null;
+                }
                 return committableOffsets;
             } else {
                 return Collections.emptyMap();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
index aac2dd36b49..d43670429b1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
@@ -66,7 +66,7 @@ public class DefaultTaskExecutorTest {
         when(task.isProcessable(anyLong())).thenReturn(true);
         when(task.id()).thenReturn(new TaskId(0, 0, "A"));
         when(task.process(anyLong())).thenReturn(true);
-        when(task.prepareCommit()).thenReturn(Collections.emptyMap());
+        when(task.prepareCommit(true)).thenReturn(Collections.emptyMap());
     }
 
     @AfterEach
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a4cee67ad5f..81c90d043ce 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -591,7 +591,7 @@ public class TopologyTestDriver implements Closeable {
                 // Process the record ...
                 task.process(mockWallClockTime.milliseconds());
                 task.maybePunctuateStreamTime();
-                commit(task.prepareCommit());
+                commit(task.prepareCommit(true));
                 task.postCommit(true);
                 captureOutputsAndReEnqueueInternalResults();
             }
@@ -709,7 +709,7 @@ public class TopologyTestDriver implements Closeable {
         mockWallClockTime.sleep(advance.toMillis());
         if (task != null) {
             task.maybePunctuateSystemTime();
-            commit(task.prepareCommit());
+            commit(task.prepareCommit(true));
             task.postCommit(true);
         }
         completeAllProcessableWork();
@@ -1130,7 +1130,7 @@ public class TopologyTestDriver implements Closeable {
     public void close() {
         if (task != null) {
             task.suspend();
-            task.prepareCommit();
+            task.prepareCommit(true);
             task.postCommit(true);
             task.closeClean();
         }


Reply via email to