This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c2492bca71 KAFKA-10199: Consider tasks in state updater when
computing offset sums (#13925)
5c2492bca71 is described below
commit 5c2492bca71200806ccf776ea31639a90290d43e
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Jul 3 16:35:34 2023 +0200
KAFKA-10199: Consider tasks in state updater when computing offset sums
(#13925)
With the state updater, the task manager needs also to look into the
tasks owned by the state updater when computing the sum of offsets
of the state. This sum of offsets is used by the high availability
assignor to assign warm-up replicas.
If the task manager does not take into account tasks in the
state updater, a warm-up replica will never report back that
the state for the corresponding task has caught up. Consequently,
the warm-up replica will never be dismissed and probing rebalances
will never end.
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>, Walker Carlson <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 36 ++++++-----
.../processor/internals/TaskManagerTest.java | 70 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 15 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 35b0aab3613..9152d20721f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1141,25 +1141,30 @@ public class TaskManager {
// Not all tasks will create directories, and there may be directories
for tasks we don't currently own,
// so we consider all tasks that are either owned or on disk. This
includes stateless tasks, which should
// just have an empty changelogOffsets map.
- for (final TaskId id : union(HashSet::new, lockedTaskDirectories,
tasks.allTaskIds())) {
- final Task task = tasks.contains(id) ? tasks.task(id) : null;
- // Closed and uninitialized tasks don't have any offsets so we
should read directly from the checkpoint
- if (task != null && task.state() != State.CREATED && task.state()
!= State.CLOSED) {
+ final Map<TaskId, Task> tasks = allTasks();
+ final Set<TaskId>
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =
+ union(HashSet::new, lockedTaskDirectories, tasks.keySet());
+ for (final Task task : tasks.values()) {
+ if (task.state() != State.CREATED && task.state() != State.CLOSED)
{
final Map<TopicPartition, Long> changelogOffsets =
task.changelogOffsets();
if (changelogOffsets.isEmpty()) {
- log.debug("Skipping to encode apparently stateless (or
non-logged) offset sum for task {}", id);
+ log.debug("Skipping to encode apparently stateless (or
non-logged) offset sum for task {}",
+ task.id());
} else {
- taskOffsetSums.put(id, sumOfChangelogOffsets(id,
changelogOffsets));
+ taskOffsetSums.put(task.id(),
sumOfChangelogOffsets(task.id(), changelogOffsets));
}
- } else {
- final File checkpointFile =
stateDirectory.checkpointFileFor(id);
- try {
- if (checkpointFile.exists()) {
- taskOffsetSums.put(id, sumOfChangelogOffsets(id, new
OffsetCheckpoint(checkpointFile).read()));
- }
- } catch (final IOException e) {
- log.warn(String.format("Exception caught while trying to
read checkpoint for task %s:", id), e);
+
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id());
+ }
+ }
+
+ for (final TaskId id :
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) {
+ final File checkpointFile = stateDirectory.checkpointFileFor(id);
+ try {
+ if (checkpointFile.exists()) {
+ taskOffsetSums.put(id, sumOfChangelogOffsets(id, new
OffsetCheckpoint(checkpointFile).read()));
}
+ } catch (final IOException e) {
+ log.warn(String.format("Exception caught while trying to read
checkpoint for task %s:", id), e);
}
}
@@ -1177,6 +1182,7 @@ public class TaskManager {
// current set of actually-locked tasks.
lockedTaskDirectories.clear();
+ final Map<TaskId, Task> allTasks = allTasks();
for (final TaskDirectory taskDir :
stateDirectory.listNonEmptyTaskDirectories()) {
final File dir = taskDir.file();
final String namedTopology = taskDir.namedTopology();
@@ -1184,7 +1190,7 @@ public class TaskManager {
final TaskId id = parseTaskDirectoryName(dir.getName(),
namedTopology);
if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id);
- if (!tasks.contains(id)) {
+ if (!allTasks.containsKey(id)) {
log.debug("Temporarily locked unassigned task {} for
the upcoming rebalance", id);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 036e2ef7924..22da72feecd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1592,6 +1592,76 @@ public class TaskManagerTest {
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
+ @Test
+ public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater()
throws Exception {
+ final StreamTask restoringStatefulTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final long changelogOffset = 42L;
+
when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffset)));
+ expectLockObtainedFor(taskId00);
+ makeTaskFolders(taskId00.toString());
+ final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
+ writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask));
+ replay(stateDirectory);
+ taskManager.handleRebalanceStart(singleton("topic"));
+
+ assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00,
changelogOffset))));
+ }
+
+ @Test
+ public void
shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws
Exception {
+ final StandbyTask restoringStandbyTask = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING).build();
+ final long changelogOffset = 42L;
+
when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffset)));
+ expectLockObtainedFor(taskId00);
+ makeTaskFolders(taskId00.toString());
+ final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
+ writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask));
+ replay(stateDirectory);
+ taskManager.handleRebalanceStart(singleton("topic"));
+
+ assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00,
changelogOffset))));
+ }
+
+ @Test
+ public void
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+ final StreamTask runningStatefulTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING).build();
+ final StreamTask restoringStatefulTask = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask restoringStandbyTask = standbyTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING).build();
+ final long changelogOffsetOfRunningTask = 42L;
+ final long changelogOffsetOfRestoringStatefulTask = 24L;
+ final long changelogOffsetOfRestoringStandbyTask = 84L;
+ when(runningStatefulTask.changelogOffsets())
+ .thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffsetOfRunningTask)));
+ when(restoringStatefulTask.changelogOffsets())
+ .thenReturn(mkMap(mkEntry(t1p1changelog,
changelogOffsetOfRestoringStatefulTask)));
+ when(restoringStandbyTask.changelogOffsets())
+ .thenReturn(mkMap(mkEntry(t1p2changelog,
changelogOffsetOfRestoringStandbyTask)));
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+ when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask,
restoringStatefulTask));
+
+ assertThat(
+ taskManager.getTaskOffsetSums(),
+ is(mkMap(
+ mkEntry(taskId00, changelogOffsetOfRunningTask),
+ mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask),
+ mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask)
+ ))
+ );
+ }
+
@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws
Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(