This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ee4debb9f0d KAFKA-19128: Kafka Streams should not get offsets when close dirty (#19450) ee4debb9f0d is described below commit ee4debb9f0dc2749b317654f2e5bcf7bdab35884 Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com> AuthorDate: Fri Apr 25 06:23:07 2025 +0200 KAFKA-19128: Kafka Streams should not get offsets when close dirty (#19450) Kafka Streams calls `prepareCommit()` in `Taskmanager#closeTaskDirty()`. However, the dirty task must not get committed and therefore, prepare-commit tasks such as getting offsets should not be needed as well. The only thing needed before closing a task dirty is flushing. Therefore, separating `flush` and `prepareCommit` could be a good fix. Reviewers: Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/internals/ReadOnlyTask.java | 2 +- .../streams/processor/internals/StandbyTask.java | 10 ++- .../streams/processor/internals/StreamTask.java | 7 +- .../kafka/streams/processor/internals/Task.java | 2 +- .../streams/processor/internals/TaskExecutor.java | 2 +- .../streams/processor/internals/TaskManager.java | 12 ++-- .../processor/internals/StandbyTaskTest.java | 14 ++-- .../processor/internals/StreamTaskTest.java | 82 +++++++++++++--------- .../processor/internals/TaskManagerTest.java | 49 +++++++------ .../internals/tasks/DefaultTaskExecutorTest.java | 2 +- .../apache/kafka/streams/TopologyTestDriver.java | 6 +- 11 files changed, 107 insertions(+), 81 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java index a895b71e4e9..dd5a2c6e1d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java @@ -180,7 +180,7 @@ public class ReadOnlyTask implements Task { } @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { throw new UnsupportedOperationException("This task is read-only"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 417f754ca2c..4c6e6674bdb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -179,7 +179,7 @@ public class StandbyTask extends AbstractTask implements Task { * or flushing state store get IO errors; such error should cause the thread to die */ @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { switch (state()) { case CREATED: log.debug("Skipped preparing created task for commit"); @@ -189,7 +189,11 @@ public class StandbyTask extends AbstractTask implements Task { case RUNNING: case SUSPENDED: // do not need to flush state store caches in pre-commit since nothing would be sent for standby tasks - log.debug("Prepared {} task for committing", state()); + if (!clean) { + log.debug("Skipped preparing {} standby task with id {} for commit since the task is getting closed dirty.", state(), id); + } else { + log.debug("Prepared {} task for committing", state()); + } break; @@ -197,7 +201,7 @@ public class StandbyTask extends AbstractTask implements Task { throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); } - return Collections.emptyMap(); + return clean ? Collections.emptyMap() : null; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 424d6f7af61..93737d82289 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -417,7 +417,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, timeCurrentIdlingStarted = Optional.empty(); } - public void flush() { stateMgr.flushCache(); recordCollector.flush(); @@ -429,7 +428,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * @return offsets that should be committed for this task */ @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { switch (state()) { case CREATED: case RESTORING: @@ -444,6 +443,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, // // TODO: this should be removed after we decouple caching with emitting flush(); + if (!clean) { + log.debug("Skipped preparing {} task with id {} for commit since the task is getting closed dirty.", state(), id); + return null; + } hasPendingTxCommit = eosEnabled; log.debug("Prepared {} task for committing", state()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 484c1ca574b..ba09700af8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -201,7 +201,7 @@ public interface Task { /** * @throws StreamsException fatal error, should close the thread */ - Map<TopicPartition, OffsetAndMetadata> prepareCommit(); + Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean); void postCommit(boolean enforceCheckpoint); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java index c993787503e..91deab0dd9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java @@ -142,7 +142,7 @@ public class TaskExecutor { for (final Task task : tasksToCommit) { // we need to call commitNeeded first since we need to update committable offsets if (task.commitNeeded()) { - final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(); + final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(true); if (!offsetAndMetadata.isEmpty()) { consumedOffsetsAndMetadata.put(task, offsetAndMetadata); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index eccf0c8f33d..9376e6887f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -278,7 +278,7 @@ public class TaskManager { // we do not need to take the returned offsets since we are not going to commit anyways; // this call is only used for active tasks to flush the cache before suspending and // closing the topology - task.prepareCommit(); + task.prepareCommit(false); } catch (final RuntimeException swallow) { log.warn("Error flushing cache for corrupted task {}. " + "Since the task is closing dirty, the following exception is swallowed: {}", @@ -812,7 +812,7 @@ public class TaskManager { // and their changelog positions should not change at all postCommit would not write the checkpoint again. // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably // write the checkpoint file. - final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit(); + final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit(true); if (!offsets.isEmpty()) { log.error("Task {} should have been committed when it was suspended, but it reports non-empty " + "offsets {} to commit; this means it failed during last commit and hence should be closed dirty", @@ -1264,7 +1264,7 @@ public class TaskManager { final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { try { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(true); if (!committableOffsets.isEmpty()) { consumedOffsetsPerTask.put(task, committableOffsets); } @@ -1479,7 +1479,7 @@ public class TaskManager { try { // we call this function only to flush the case if necessary // before suspending and closing the topology - task.prepareCommit(); + task.prepareCommit(false); } catch (final RuntimeException swallow) { log.warn("Error flushing cache of dirty task {}. " + "Since the task is closing dirty, the following exception is swallowed: {}", @@ -1630,7 +1630,7 @@ public class TaskManager { // first committing all tasks and then suspend and close them clean for (final Task task : activeTasksToClose) { try { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(true); tasksToCommit.add(task); if (!committableOffsets.isEmpty()) { consumedOffsetsAndMetadataPerTask.put(task, committableOffsets); @@ -1719,7 +1719,7 @@ public class TaskManager { // first committing and then suspend / close clean for (final Task task : standbyTasksToClose) { try { - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); task.suspend(); closeTaskClean(task); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index e953a61fc1f..768f3787d0b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -213,7 +213,7 @@ public class StandbyTaskTest { task.suspend(); task.closeClean(); - assertThrows(IllegalStateException.class, task::prepareCommit); + assertThrows(IllegalStateException.class, () -> task.prepareCommit(true)); } @Test @@ -261,13 +261,13 @@ public class StandbyTaskTest { task = createStandbyTask(); task.initializeIfNeeded(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // this should not checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // this should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // this should not checkpoint verify(stateManager).checkpoint(); @@ -322,7 +322,7 @@ public class StandbyTaskTest { task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); task.closeClean(); @@ -360,7 +360,7 @@ public class StandbyTaskTest { // could commit if the offset advanced beyond threshold assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); } @@ -389,7 +389,7 @@ public class StandbyTaskTest { task = createStandbyTask(); task.initializeIfNeeded(); - task.prepareCommit(); + task.prepareCommit(true); assertThrows(RuntimeException.class, () -> task.postCommit(true)); assertEquals(RUNNING, task.state()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index bcf24ee7df8..98807cd6342 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -645,6 +645,22 @@ public class StreamTaskTest { assertEquals(asList(201, 202, 203), source2.values); } + @Test + public void shouldNotGetOffsetsIfPrepareCommitDirty() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatefulTask(createConfig("100"), false); + + task.addRecords(partition1, List.of(getConsumerRecordWithOffsetAsTimestamp(partition1, 0))); + task.addRecords(partition2, List.of(getConsumerRecordWithOffsetAsTimestamp(partition2, 0))); + + assertTrue(task.process(0L)); + assertTrue(task.commitNeeded()); + + // committableOffsetsAndMetadata() has not been called, otherwise prepareCommit() would have returned a map + assertNull(task.prepareCommit(false)); + } + @Test public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() { when(stateManager.taskId()).thenReturn(taskId); @@ -660,7 +676,7 @@ public class StreamTaskTest { )); assertTrue(task.process(time.milliseconds())); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.process(time.milliseconds())); task.postCommit(false); assertTrue(task.process(time.milliseconds())); @@ -683,7 +699,7 @@ public class StreamTaskTest { )); assertTrue(task.process(time.milliseconds())); - task.prepareCommit(); + task.prepareCommit(true); assertFalse(task.process(time.milliseconds())); task.postCommit(false); assertTrue(task.process(time.milliseconds())); @@ -1328,7 +1344,7 @@ public class StreamTaskTest { assertTrue(task.process(0L)); assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.commitNeeded()); task.postCommit(true); @@ -1338,7 +1354,7 @@ public class StreamTaskTest { assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.commitNeeded()); task.postCommit(true); @@ -1349,7 +1365,7 @@ public class StreamTaskTest { assertTrue(task.maybePunctuateSystemTime()); assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.commitNeeded()); task.postCommit(true); @@ -1374,7 +1390,7 @@ public class StreamTaskTest { task.process(0L); processorSystemTime.mockProcessor.addProcessorMetadata("key2", 200L); - final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = task.prepareCommit(); + final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = task.prepareCommit(true); final TopicPartitionMetadata expected = new TopicPartitionMetadata(3L, new ProcessorMetadata( mkMap( @@ -1413,7 +1429,7 @@ public class StreamTaskTest { final TopicPartitionMetadata metadata = new TopicPartitionMetadata(0, new ProcessorMetadata()); assertTrue(task.commitNeeded()); - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap( mkEntry(partition1, new OffsetAndMetadata(3L, Optional.of(2), metadata.encode())) ) @@ -1430,7 +1446,7 @@ public class StreamTaskTest { task.process(0L); assertTrue(task.commitNeeded()); - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap( mkEntry(partition1, new OffsetAndMetadata(3L, Optional.of(2), metadata.encode())), mkEntry(partition2, new OffsetAndMetadata(1L, Optional.of(0), metadata.encode())) @@ -1486,7 +1502,7 @@ public class StreamTaskTest { assertTrue(task.commitNeeded()); - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap( mkEntry(partition1, new OffsetAndMetadata(1L, Optional.of(1), expectedMetadata1.encode())), mkEntry(partition2, new OffsetAndMetadata(2L, Optional.of(1), expectedMetadata2.encode())) @@ -1509,7 +1525,7 @@ public class StreamTaskTest { assertTrue(task.commitNeeded()); // Processor metadata not updated, we just need to commit to partition1 again with new offset - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap(mkEntry(partition1, new OffsetAndMetadata(2L, Optional.of(1), expectedMetadata3.encode()))) )); task.postCommit(false); @@ -1526,7 +1542,7 @@ public class StreamTaskTest { final IllegalStateException thrown = assertThrows( IllegalStateException.class, - task::prepareCommit + () -> task.prepareCommit(true) ); assertThat(thrown.getMessage(), is("Illegal state CLOSED while preparing active task 0_0 for committing")); @@ -1820,10 +1836,10 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // should not checkpoint assertThat("Map was empty", task.highWaterMark().size() == 2); @@ -1847,10 +1863,10 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold assertThat("Map was empty", task.highWaterMark().size() == 2); @@ -1866,7 +1882,7 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); final File checkpointFile = new File( stateDirectory.getOrCreateDirectoryForTask(taskId), @@ -2011,7 +2027,7 @@ public class StreamTaskTest { assertTrue(task.process(0L)); assertTrue(task.process(0L)); - task.prepareCommit(); + task.prepareCommit(true); if (doCommit) { task.updateCommittedOffsets(repartition, 10L); } @@ -2050,7 +2066,7 @@ public class StreamTaskTest { task.transitionTo(SUSPENDED); task.transitionTo(Task.State.CLOSED); - assertThrows(IllegalStateException.class, task::prepareCommit); + assertThrows(IllegalStateException.class, () -> task.prepareCommit(true)); } @Test @@ -2101,7 +2117,7 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); task.suspend(); @@ -2123,7 +2139,7 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold task.suspend(); @@ -2207,7 +2223,7 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should flush and checkpoint task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should flush and checkpoint task.closeClean(); @@ -2277,7 +2293,7 @@ public class StreamTaskTest { assertTrue(task.commitNeeded()); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); assertEquals(SUSPENDED, task.state()); @@ -2307,7 +2323,7 @@ public class StreamTaskTest { assertTrue(task.commitNeeded()); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should checkpoint assertThrows(ProcessorStateException.class, () -> task.closeClean()); @@ -2336,7 +2352,7 @@ public class StreamTaskTest { task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset))); task.process(100L); - assertThrows(ProcessorStateException.class, task::prepareCommit); + assertThrows(ProcessorStateException.class, () -> task.prepareCommit(true)); assertEquals(RUNNING, task.state()); @@ -2369,7 +2385,7 @@ public class StreamTaskTest { assertTrue(task.commitNeeded()); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); assertThrows(ProcessorStateException.class, () -> task.postCommit(true)); assertEquals(Task.State.SUSPENDED, task.state()); @@ -2672,7 +2688,7 @@ public class StreamTaskTest { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode())))) @@ -2704,7 +2720,7 @@ public class StreamTaskTest { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode())))) ); } @@ -2734,14 +2750,14 @@ public class StreamTaskTest { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); } @@ -2771,7 +2787,7 @@ public class StreamTaskTest { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode())))) @@ -2803,7 +2819,7 @@ public class StreamTaskTest { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode())))) ); } @@ -2834,14 +2850,14 @@ public class StreamTaskTest { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 9d7df53adbe..d8bb35c000a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -459,7 +459,7 @@ public class TaskManagerTest { taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); - verify(activeTaskToClose).prepareCommit(); + verify(activeTaskToClose).prepareCommit(false); verify(activeTaskToClose).suspend(); verify(activeTaskToClose).closeDirty(); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); @@ -500,7 +500,7 @@ public class TaskManagerTest { taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); - verify(standbyTaskToClose).prepareCommit(); + verify(standbyTaskToClose).prepareCommit(false); verify(standbyTaskToClose).suspend(); verify(standbyTaskToClose).closeDirty(); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); @@ -996,7 +996,7 @@ public class TaskManagerTest { taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); - verify(activeTaskToRecycle).prepareCommit(); + verify(activeTaskToRecycle).prepareCommit(true); verify(tasks).addPendingTasksToInit(Set.of(standbyTask)); verify(tasks).removeTask(activeTaskToRecycle); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); @@ -1019,7 +1019,7 @@ public class TaskManagerTest { taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); - verify(activeTaskToRecycle).prepareCommit(); + verify(activeTaskToRecycle).prepareCommit(true); verify(tasks).replaceActiveWithStandby(standbyTask); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); @@ -1059,7 +1059,7 @@ public class TaskManagerTest { taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); - verify(activeTaskToClose).prepareCommit(); + verify(activeTaskToClose).prepareCommit(true); verify(activeTaskToClose).closeClean(); verify(tasks).removeTask(activeTaskToClose); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); @@ -1536,10 +1536,10 @@ public class TaskManagerTest { taskManager.handleLostAll(); - verify(task1).prepareCommit(); + verify(task1).prepareCommit(false); verify(task1).suspend(); verify(task1).closeDirty(); - verify(task2).prepareCommit(); + verify(task2).prepareCommit(false); verify(task2).suspend(); verify(task2).closeDirty(); } @@ -1569,7 +1569,7 @@ public class TaskManagerTest { verify(task1).suspend(); verify(task1).closeClean(); - verify(task2).prepareCommit(); + verify(task2).prepareCommit(false); verify(task2).suspend(); verify(task2).closeDirty(); verify(task3).suspend(); @@ -2386,10 +2386,10 @@ public class TaskManagerTest { taskManager.handleCorruption(Set.of(taskId02)); verify(activeRestoringTask, never()).commitNeeded(); - verify(activeRestoringTask, never()).prepareCommit(); + verify(activeRestoringTask, never()).prepareCommit(true); verify(activeRestoringTask, never()).postCommit(anyBoolean()); verify(standbyTask, never()).commitNeeded(); - verify(standbyTask, never()).prepareCommit(); + verify(standbyTask, never()).prepareCommit(true); verify(standbyTask, never()).postCommit(anyBoolean()); } @@ -2418,9 +2418,9 @@ public class TaskManagerTest { taskManager.handleCorruption(Set.of(taskId02)); verify(activeRestoringTask, never()).commitNeeded(); - verify(activeRestoringTask, never()).prepareCommit(); + verify(activeRestoringTask, never()).prepareCommit(true); verify(activeRestoringTask, never()).postCommit(anyBoolean()); - verify(standbyTask).prepareCommit(); + verify(standbyTask).prepareCommit(true); verify(standbyTask).postCommit(anyBoolean()); } @@ -2431,7 +2431,7 @@ public class TaskManagerTest { final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { throw new TaskMigratedException("You dropped out of the group!", new RuntimeException()); } }; @@ -3394,7 +3394,7 @@ public class TaskManagerTest { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { throw new RuntimeException("task 0_1 prepare commit boom!"); } }; @@ -3560,7 +3560,7 @@ public class TaskManagerTest { verify(activeTaskCreator).close(); verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); - verify(failedStatefulTask).prepareCommit(); + verify(failedStatefulTask).prepareCommit(false); verify(failedStatefulTask).suspend(); verify(failedStatefulTask).closeDirty(); } @@ -3634,16 +3634,16 @@ public class TaskManagerTest { verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); verify(tasks).addTask(removedStatefulTask); verify(tasks).addTask(removedStandbyTask); - verify(removedFailedStatefulTask).prepareCommit(); + verify(removedFailedStatefulTask).prepareCommit(false); verify(removedFailedStatefulTask).suspend(); verify(removedFailedStatefulTask).closeDirty(); - verify(removedFailedStandbyTask).prepareCommit(); + verify(removedFailedStandbyTask).prepareCommit(false); verify(removedFailedStandbyTask).suspend(); verify(removedFailedStandbyTask).closeDirty(); - verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(); + verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(false); verify(removedFailedStatefulTaskDuringRemoval).suspend(); verify(removedFailedStatefulTaskDuringRemoval).closeDirty(); - verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(); + verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(false); verify(removedFailedStandbyTaskDuringRemoval).suspend(); verify(removedFailedStandbyTaskDuringRemoval).closeDirty(); } @@ -3869,7 +3869,7 @@ public class TaskManagerTest { public void shouldPropagateExceptionFromActiveCommit() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { throw new RuntimeException("opsh."); } }; @@ -3893,7 +3893,7 @@ public class TaskManagerTest { public void shouldPropagateExceptionFromStandbyCommit() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { throw new RuntimeException("opsh."); } }; @@ -4689,7 +4689,7 @@ public class TaskManagerTest { final StandbyTask standbyTask = mock(StandbyTask.class); when(standbyTask.id()).thenReturn(taskId00); when(standbyTask.isActive()).thenReturn(false); - when(standbyTask.prepareCommit()).thenReturn(Collections.emptyMap()); + when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap()); final StreamTask activeTask = mock(StreamTask.class); when(activeTask.id()).thenReturn(taskId00); @@ -4939,10 +4939,13 @@ public class TaskManagerTest { } @Override - public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { + public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) { commitPrepared = true; if (commitNeeded) { + if (!clean) { + return null; + } return committableOffsets; } else { return Collections.emptyMap(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index aac2dd36b49..d43670429b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -66,7 +66,7 @@ public class DefaultTaskExecutorTest { when(task.isProcessable(anyLong())).thenReturn(true); when(task.id()).thenReturn(new TaskId(0, 0, "A")); when(task.process(anyLong())).thenReturn(true); - when(task.prepareCommit()).thenReturn(Collections.emptyMap()); + when(task.prepareCommit(true)).thenReturn(Collections.emptyMap()); } @AfterEach diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index a4cee67ad5f..81c90d043ce 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -591,7 +591,7 @@ public class TopologyTestDriver implements Closeable { // Process the record ... task.process(mockWallClockTime.milliseconds()); task.maybePunctuateStreamTime(); - commit(task.prepareCommit()); + commit(task.prepareCommit(true)); task.postCommit(true); captureOutputsAndReEnqueueInternalResults(); } @@ -709,7 +709,7 @@ public class TopologyTestDriver implements Closeable { mockWallClockTime.sleep(advance.toMillis()); if (task != null) { task.maybePunctuateSystemTime(); - commit(task.prepareCommit()); + commit(task.prepareCommit(true)); task.postCommit(true); } completeAllProcessableWork(); @@ -1130,7 +1130,7 @@ public class TopologyTestDriver implements Closeable { public void close() { if (task != null) { task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); task.closeClean(); }