This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2da54180407 KAFKA-16876: TaskManager.handleRevocation doesn't handle
errors thrown from task.prepareCommit (#22282)
2da54180407 is described below
commit 2da541804077dee04025fae47070ffa5d1b029b3
Author: gabriellafu <[email protected]>
AuthorDate: Tue May 26 14:48:28 2026 -0400
KAFKA-16876: TaskManager.handleRevocation doesn't handle errors thrown from
task.prepareCommit (#22282)
1. Fix TaskManager.handleRevocation to always suspend revoked tasks,
even when prepareCommit throws (e.g. TaskMigratedException from
producer.send during cache flush). Previously the exception propagated
uncaught, skipping the suspend loop entirely. This left tasks in RUNNING
state, which caused a downstream IllegalStateException when
handleAssignment tried to close them.
2. Wrap prepare/commit/postCommit in try-finally so the suspend loop and
task unlock are guaranteed to execute regardless of where an exception
occurs.
3. Preserve all exceptions via addSuppressed instead of silently
dropping later exceptions. The first exception remains the primary
thrown exception for backward compatibility, but subsequent exceptions
(e.g. the IllegalStateException from closing an unsuspended task) are
now attached as suppressed exceptions instead of lost.
4. Updated all callers in shutdown(), tryCloseCleanActiveTasks(), and
handleRevocation to use maybeSetFirstException consistently. This is a
behavior change: secondary failures during shutdown and task close are
now visible as suppressed exceptions instead of being lost.
Reviewers: Lucas Brutschy <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 187 ++++++++++++---------
.../processor/internals/TaskManagerTest.java | 50 ++++++
2 files changed, 153 insertions(+), 84 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 337e70492c0..a25e0e32f88 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -430,8 +430,8 @@ public class TaskManager {
if (exception instanceof TaskMigratedException) {
lastTaskMigrated = (TaskMigratedException) exception;
} else if (exception instanceof TaskCorruptedException) {
- log.warn("Encounter corrupted task " + taskId + ",
will group it with other corrupted tasks " +
- "and handle together", exception);
+ log.warn("Encounter corrupted task {}, will group it
with other corrupted tasks " +
+ "and handle together", taskId, exception);
aggregatedCorruptedTaskIds.add(taskId);
} else {
((StreamsException) exception).setTaskId(taskId);
@@ -1035,105 +1035,120 @@ public class TaskManager {
final Set<TaskId> lockedTaskIds =
activeRunningTaskIterable().stream().map(Task::id).collect(Collectors.toSet());
maybeLockTasks(lockedTaskIds);
+ // After locking, everything must be inside try-finally to guarantee
suspend and unlock.
+ final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> tasksToSkipPostCommit = new
TreeSet<>(Comparator.comparing(Task::id));
boolean revokedTasksNeedCommit = false;
- for (final StreamTask task : activeRunningTaskIterable()) {
- if
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
- // when the task input partitions are included in the revoked
list,
- // this is an active task and should be revoked
+ boolean prepareCommitSucceeded = false;
+ try {
+ for (final StreamTask task : activeRunningTaskIterable()) {
+ if
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
+ // when the task input partitions are included in the
revoked list,
+ // this is an active task and should be revoked
- revokedActiveTasks.add(task);
- remainingRevokedPartitions.removeAll(task.inputPartitions());
+ revokedActiveTasks.add(task);
+
remainingRevokedPartitions.removeAll(task.inputPartitions());
- revokedTasksNeedCommit |= task.commitNeeded();
- } else if (task.commitNeeded()) {
- commitNeededActiveTasks.add(task);
+ revokedTasksNeedCommit |= task.commitNeeded();
+ } else if (task.commitNeeded()) {
+ commitNeededActiveTasks.add(task);
+ }
}
- }
- revokeTasksInStateUpdater(remainingRevokedPartitions);
+ revokeTasksInStateUpdater(remainingRevokedPartitions);
- if (!remainingRevokedPartitions.isEmpty()) {
- log.debug("The following revoked partitions {} are missing from
the current task partitions. It could "
- + "potentially be due to race condition of consumer
detecting the heartbeat failure, or the tasks " +
- "have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
- }
-
- if (revokedTasksNeedCommit) {
- prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
- // if we need to commit any revoking task then we just commit all
of those needed committing together
- prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
- }
-
- // even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
- // any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
- // as such we just need to skip those dirty tasks in the checkpoint
- final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
- try {
- if (revokedTasksNeedCommit) {
- // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
- // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to
make sure we don't skip the
- // offset commit because we are in a rebalance
-
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ if (!remainingRevokedPartitions.isEmpty()) {
+ log.debug("The following revoked partitions {} are missing
from the current task partitions. It could "
+ + "potentially be due to race condition of
consumer detecting the heartbeat failure, or the tasks " +
+ "have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
}
- } catch (final TaskCorruptedException e) {
- log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
- e.corruptedTasks());
-
- // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
- closeDirtyAndRevive(dirtyTasks, true);
- } catch (final TimeoutException e) {
- log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
-
- // If we hit a TimeoutException it must be ALOS, just close dirty
and revive without wiping the state
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- closeDirtyAndRevive(dirtyTasks, false);
- } catch (final RuntimeException e) {
- log.error("Exception caught while committing those revoked tasks "
+ revokedActiveTasks, e);
- firstException.compareAndSet(null, e);
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- }
- // we enforce checkpointing upon suspending a task: if it is resumed
later we just proceed normally, if it is
- // going to be closed we would checkpoint by then
- for (final Task task : revokedActiveTasks) {
- if (!dirtyTasks.contains(task)) {
+ if (revokedTasksNeedCommit) {
try {
- task.postCommit(true);
+ prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
+ // if we need to commit any revoking task then we just
commit all of those needed committing together
+ prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
+ prepareCommitSucceeded = true;
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " +
task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ log.error("Exception caught while preparing to commit
revoked tasks {} and commit-needed tasks {}", revokedActiveTasks,
commitNeededActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ tasksToSkipPostCommit.addAll(revokedActiveTasks);
+ tasksToSkipPostCommit.addAll(commitNeededActiveTasks);
}
}
- }
- if (revokedTasksNeedCommit) {
- for (final Task task : commitNeededActiveTasks) {
- if (!dirtyTasks.contains(task)) {
+ try {
+ if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+ // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
+ // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap()
to make sure we don't skip the
+ // offset commit because we are in a rebalance
+
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ }
+ } catch (final TaskCorruptedException e) {
+ log.warn("Some tasks were corrupted when trying to commit
offsets, these will be cleaned and revived: {}",
+ e.corruptedTasks());
+
+ // If we hit a TaskCorruptedException it must be EOS, just
handle the cleanup for those corrupted tasks right here
+ dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+ closeDirtyAndRevive(dirtyTasks, true);
+ } catch (final TimeoutException e) {
+ log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
+
+ // If we hit a TimeoutException it must be ALOS, just close
dirty and revive without wiping the state
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ closeDirtyAndRevive(dirtyTasks, false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while committing those revoked
tasks {}", revokedActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ }
+
+ // we enforce checkpointing upon suspending a task: if it is
resumed later we just proceed normally, if it is
+ // going to be closed we would checkpoint by then
+ for (final Task task : revokedActiveTasks) {
+ if (!dirtyTasks.contains(task) &&
!tasksToSkipPostCommit.contains(task)) {
try {
- // for non-revoking active tasks, we should not
enforce checkpoint
- // since if it is EOS enabled, no checkpoint should be
written while
- // the task is in RUNNING tate
- task.postCommit(false);
+ task.postCommit(true);
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task
" + task.id(), e);
+ log.error("Exception caught while post-committing task
{}", task.id(), e);
maybeSetFirstException(false,
maybeWrapTaskException(e, task.id()), firstException);
}
}
}
- }
- for (final Task task : revokedActiveTasks) {
+ if (revokedTasksNeedCommit) {
+ for (final Task task : commitNeededActiveTasks) {
+ if (!dirtyTasks.contains(task) &&
!tasksToSkipPostCommit.contains(task)) {
+ try {
+ // for non-revoking active tasks, we should not
enforce checkpoint
+ // since if it is EOS enabled, no checkpoint
should be written while
+ // the task is in RUNNING state
+ task.postCommit(false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while post-committing
task {}", task.id(), e);
+ maybeSetFirstException(false,
maybeWrapTaskException(e, task.id()), firstException);
+ }
+ }
+ }
+ }
+ } finally {
+ for (final Task task : revokedActiveTasks) {
+ try {
+ task.suspend();
+ } catch (final RuntimeException e) {
+ log.error("Caught the following exception while trying to
suspend revoked task {}", task.id(), e);
+ maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ }
+ }
+
try {
- task.suspend();
+ maybeUnlockTasks(lockedTaskIds);
} catch (final RuntimeException e) {
- log.error("Caught the following exception while trying to
suspend revoked task " + task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ log.error("Exception caught while unlocking tasks {}",
lockedTaskIds, e);
+ maybeSetFirstException(false, e, firstException);
}
}
- maybeUnlockTasks(lockedTaskIds);
-
if (firstException.get() != null) {
throw firstException.get();
}
@@ -1385,14 +1400,14 @@ public class TaskManager {
executeAndMaybeSwallow(
clean,
() -> closeAndCleanUpTasks(activeTasks, standbyTasks, clean),
- e -> firstException.compareAndSet(null, e),
+ e -> maybeSetFirstException(false, e, firstException),
e -> log.warn("Ignoring an exception while unlocking remaining
task directories.", e)
);
executeAndMaybeSwallow(
clean,
activeTaskCreator::close,
- e -> firstException.compareAndSet(null, e),
+ e -> maybeSetFirstException(false, e, firstException),
e -> log.warn("Ignoring an exception while closing thread
producer.", e)
);
@@ -1403,7 +1418,7 @@ public class TaskManager {
executeAndMaybeSwallow(
clean,
this::releaseLockedUnassignedTaskDirectories,
- e -> firstException.compareAndSet(null, e),
+ e -> maybeSetFirstException(false, e, firstException),
e -> log.warn("Ignoring an exception while unlocking remaining
task directories.", e)
);
@@ -1512,10 +1527,10 @@ public class TaskManager {
tasksToCloseDirty.add(task);
} catch (final StreamsException e) {
e.setTaskId(task.id());
- firstException.compareAndSet(null, e);
+ maybeSetFirstException(false, e, firstException);
tasksToCloseDirty.add(task);
} catch (final RuntimeException e) {
- firstException.compareAndSet(null, new StreamsException(e,
task.id()));
+ maybeSetFirstException(false, new StreamsException(e,
task.id()), firstException);
tasksToCloseDirty.add(task);
}
}
@@ -1528,7 +1543,7 @@ public class TaskManager {
try {
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
} catch (final RuntimeException e) {
- log.error("Exception caught while committing tasks " +
consumedOffsetsAndMetadataPerTask.keySet(), e);
+ log.error("Exception caught while committing tasks {}",
consumedOffsetsAndMetadataPerTask.keySet(), e);
// TODO: should record the task ids when handling this
exception
maybeSetFirstException(false, e, firstException);
@@ -1552,7 +1567,7 @@ public class TaskManager {
try {
task.postCommit(true);
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " +
task.id(), e);
+ log.error("Exception caught while post-committing task
{}", task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
tasksToCloseDirty.add(task);
tasksToCloseClean.remove(task);
@@ -2049,7 +2064,11 @@ public class TaskManager {
final RuntimeException exception,
final
AtomicReference<RuntimeException> firstException) {
if (!ignoreTaskMigrated || !(exception instanceof
TaskMigratedException)) {
- firstException.compareAndSet(null, exception);
+ if (!firstException.compareAndSet(null, exception)) {
+ if (exception != firstException.get()) {
+ firstException.get().addSuppressed(exception);
+ }
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 74dd97050ee..3a84f779e42 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -78,6 +78,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -2993,6 +2994,55 @@ public class TaskManagerTest {
verify(task00).suspend();
}
+ @Test
+ public void shouldSuspendRevokedTasksWhenPrepareCommitThrows() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
+
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenThrow(new
TaskMigratedException("task migrated"));
+
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ final StreamsException thrown = assertThrows(StreamsException.class,
+ () -> taskManager.handleRevocation(taskId00Partitions));
+
+ assertInstanceOf(TaskMigratedException.class, thrown);
+ assertEquals(Optional.of(taskId00), thrown.taskId());
+
+ verify(task00).suspend();
+ verify(task00, never()).postCommit(anyBoolean());
+ }
+
+ @Test
+ public void
shouldAttachSuppressedExceptionWhenPrepareCommitAndSuspendBothFailDuringRevocation()
{
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
+
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenThrow(new
TaskMigratedException("task migrated"));
+ doThrow(new RuntimeException("suspend failed")).when(task00).suspend();
+
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ final StreamsException thrown = assertThrows(StreamsException.class,
+ () -> taskManager.handleRevocation(taskId00Partitions));
+
+ assertInstanceOf(TaskMigratedException.class, thrown);
+ assertEquals(1, thrown.getSuppressed().length);
+ assertInstanceOf(StreamsException.class, thrown.getSuppressed()[0]);
+ }
+
@Test
public void
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
// task being revoked, needs commit