This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c2492bca71 KAFKA-10199: Consider tasks in state updater when 
computing offset sums (#13925)
5c2492bca71 is described below

commit 5c2492bca71200806ccf776ea31639a90290d43e
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Jul 3 16:35:34 2023 +0200

    KAFKA-10199: Consider tasks in state updater when computing offset sums 
(#13925)
    
    With the state updater, the task manager needs also to look into the
    tasks owned by the state updater when computing the sum of offsets
    of the state. This sum of offsets is used by the high availability
    assignor to assign warm-up replicas.
    If the task manager does not take into account tasks in the
    state updater, a warm-up replica will never report back that
    the state for the corresponding task has caught up. Consequently,
    the warm-up replica will never be dismissed and probing rebalances
    will never end.
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax 
<[email protected]>, Walker Carlson <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   | 36 ++++++-----
 .../processor/internals/TaskManagerTest.java       | 70 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 15 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 35b0aab3613..9152d20721f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1141,25 +1141,30 @@ public class TaskManager {
         // Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
         // so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
         // just have an empty changelogOffsets map.
-        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-            final Task task = tasks.contains(id) ? tasks.task(id) : null;
-            // Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-            if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+        final Map<TaskId, Task> tasks = allTasks();
+        final Set<TaskId> 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =
+            union(HashSet::new, lockedTaskDirectories, tasks.keySet());
+        for (final Task task : tasks.values()) {
+            if (task.state() != State.CREATED && task.state() != State.CLOSED) 
{
                 final Map<TopicPartition, Long> changelogOffsets = 
task.changelogOffsets();
                 if (changelogOffsets.isEmpty()) {
-                    log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);
+                    log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}",
+                        task.id());
                 } else {
-                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
changelogOffsets));
+                    taskOffsetSums.put(task.id(), 
sumOfChangelogOffsets(task.id(), changelogOffsets));
                 }
-            } else {
-                final File checkpointFile = 
stateDirectory.checkpointFileFor(id);
-                try {
-                    if (checkpointFile.exists()) {
-                        taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
-                    }
-                } catch (final IOException e) {
-                    log.warn(String.format("Exception caught while trying to 
read checkpoint for task %s:", id), e);
+                
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id());
+            }
+        }
+
+        for (final TaskId id : 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) {
+            final File checkpointFile = stateDirectory.checkpointFileFor(id);
+            try {
+                if (checkpointFile.exists()) {
+                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
                 }
+            } catch (final IOException e) {
+                log.warn(String.format("Exception caught while trying to read 
checkpoint for task %s:", id), e);
             }
         }
 
@@ -1177,6 +1182,7 @@ public class TaskManager {
         // current set of actually-locked tasks.
         lockedTaskDirectories.clear();
 
+        final Map<TaskId, Task> allTasks = allTasks();
         for (final TaskDirectory taskDir : 
stateDirectory.listNonEmptyTaskDirectories()) {
             final File dir = taskDir.file();
             final String namedTopology = taskDir.namedTopology();
@@ -1184,7 +1190,7 @@ public class TaskManager {
                 final TaskId id = parseTaskDirectoryName(dir.getName(), 
namedTopology);
                 if (stateDirectory.lock(id)) {
                     lockedTaskDirectories.add(id);
-                    if (!tasks.contains(id)) {
+                    if (!allTasks.containsKey(id)) {
                         log.debug("Temporarily locked unassigned task {} for 
the upcoming rebalance", id);
                     }
                 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 036e2ef7924..22da72feecd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1592,6 +1592,76 @@ public class TaskManagerTest {
         computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
     }
 
+    @Test
+    public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() 
throws Exception {
+        final StreamTask restoringStatefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final long changelogOffset = 42L;
+        
when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog,
 changelogOffset)));
+        expectLockObtainedFor(taskId00);
+        makeTaskFolders(taskId00.toString());
+        final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
+        writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask));
+        replay(stateDirectory);
+        taskManager.handleRebalanceStart(singleton("topic"));
+
+        assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, 
changelogOffset))));
+    }
+
+    @Test
+    public void 
shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws 
Exception {
+        final StandbyTask restoringStandbyTask = standbyTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING).build();
+        final long changelogOffset = 42L;
+        
when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog,
 changelogOffset)));
+        expectLockObtainedFor(taskId00);
+        makeTaskFolders(taskId00.toString());
+        final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
+        writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask));
+        replay(stateDirectory);
+        taskManager.handleRebalanceStart(singleton("topic"));
+
+        assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, 
changelogOffset))));
+    }
+
+    @Test
+    public void 
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+        final StreamTask runningStatefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING).build();
+        final StreamTask restoringStatefulTask = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final StandbyTask restoringStandbyTask = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RUNNING).build();
+        final long changelogOffsetOfRunningTask = 42L;
+        final long changelogOffsetOfRestoringStatefulTask = 24L;
+        final long changelogOffsetOfRestoringStandbyTask = 84L;
+        when(runningStatefulTask.changelogOffsets())
+            .thenReturn(mkMap(mkEntry(t1p0changelog, 
changelogOffsetOfRunningTask)));
+        when(restoringStatefulTask.changelogOffsets())
+            .thenReturn(mkMap(mkEntry(t1p1changelog, 
changelogOffsetOfRestoringStatefulTask)));
+        when(restoringStandbyTask.changelogOffsets())
+            .thenReturn(mkMap(mkEntry(t1p2changelog, 
changelogOffsetOfRestoringStandbyTask)));
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, 
restoringStatefulTask));
+
+        assertThat(
+            taskManager.getTaskOffsetSums(),
+            is(mkMap(
+                mkEntry(taskId00, changelogOffsetOfRunningTask),
+                mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask),
+                mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask)
+            ))
+        );
+    }
+
     @Test
     public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws 
Exception {
         final Map<TopicPartition, Long> changelogOffsets = mkMap(

Reply via email to