This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a46da90b8f6 KAFKA-10199: Add missing catch for lock exception (#14403)
a46da90b8f6 is described below

commit a46da90b8f6cca04dd0f89fa774267898c039e47
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Sep 26 10:58:37 2023 +0200

    KAFKA-10199: Add missing catch for lock exception (#14403)
    
    The state directory throws a lock exception during initialization if a task 
state directory is still locked by the stream thread that previously owned the 
task. When this happens, Streams catches the lock exception, ignores the 
exception, and tries to initialize the task in the next exception.
    
    In the state updater code path, we missed catching the lock exception when 
Streams recycles a task. That leads to the lock exception thrown to the 
exception handler, which is unexpected and leads to test failures.
    
    Reviewer: Lucas Brutschy <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   | 23 ++++++-----
 .../processor/internals/TaskManagerTest.java       | 45 ++++++++++++++++++++--
 2 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 68c8f008fbe..349d1ccccba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -779,8 +779,7 @@ public class TaskManager {
             newTask = task.isActive() ?
                 convertActiveToStandby((StreamTask) task, inputPartitions) :
                 convertStandbyToActive((StandbyTask) task, inputPartitions);
-            newTask.initializeIfNeeded();
-            stateUpdater.add(newTask);
+            addTaskToStateUpdater(newTask);
         } catch (final RuntimeException e) {
             final TaskId taskId = task.id();
             final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. " +
@@ -843,13 +842,7 @@ public class TaskManager {
         final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
         for (final Task task : tasks.drainPendingTasksToInit()) {
             try {
-                task.initializeIfNeeded();
-                stateUpdater.add(task);
-            } catch (final LockException lockException) {
-                // The state directory may still be locked by another thread, 
when the rebalance just happened.
-                // Retry in the next iteration.
-                log.info("Encountered lock exception. Reattempting locking the 
state in the next iteration.", lockException);
-                tasks.addPendingTasksToInit(Collections.singleton(task));
+                addTaskToStateUpdater(task);
             } catch (final RuntimeException e) {
                 // need to add task back to the bookkeeping to be handled by 
the stream thread
                 tasks.addTask(task);
@@ -860,6 +853,18 @@ public class TaskManager {
         maybeThrowTaskExceptions(taskExceptions);
     }
 
+    private void addTaskToStateUpdater(final Task task) {
+        try {
+            task.initializeIfNeeded();
+            stateUpdater.add(task);
+        } catch (final LockException lockException) {
+            // The state directory may still be locked by another thread, when 
the rebalance just happened.
+            // Retry in the next iteration.
+            log.info("Encountered lock exception. Reattempting locking the 
state in the next iteration.", lockException);
+            tasks.addPendingTasksToInit(Collections.singleton(task));
+        }
+    }
+
     public void handleExceptionsFromStateUpdater() {
         final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 5ac9198ac84..b00102810c7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -778,18 +778,57 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
         final LockException lockException = new LockException("Where are my 
keys??");
-        doThrow(lockException)
-            .when(task00).initializeIfNeeded();
+        doThrow(lockException).when(task00).initializeIfNeeded();
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task00).initializeIfNeeded();
         Mockito.verify(task01).initializeIfNeeded();
-        
Mockito.verify(tasks).addPendingTasksToInit(Collections.singleton(task00));
+        Mockito.verify(tasks).addPendingTasksToInit(
+            Mockito.argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        Mockito.verify(stateUpdater, never()).add(task00);
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void 
shouldRetryInitializationWhenLockExceptionAfterRecyclingInStateUpdater() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING).build();
+        final StandbyTask task00Converted = standbyTask(taskId00, 
taskId00Partitions)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask task01Converted = statefulTask(taskId01, 
taskId01Partitions)
+            .withInputPartitions(taskId01Partitions).build();
+        when(stateUpdater.hasRemovedTasks()).thenReturn(true);
+        when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
+        
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        when(activeTaskCreator.createActiveTaskFromStandby(task01, 
taskId01Partitions,
+            consumer)).thenReturn(task01Converted);
+        when(standbyTaskCreator.createStandbyTaskFromActive(task00, 
taskId00Partitions))
+            .thenReturn(task00Converted);
+        final LockException lockException = new LockException("Where are my 
keys??");
+        doThrow(lockException).when(task00Converted).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        Mockito.verify(task00Converted).initializeIfNeeded();
+        Mockito.verify(task01Converted).initializeIfNeeded();
+        Mockito.verify(tasks).addPendingTasksToInit(
+            Mockito.argThat(tasksToInit -> 
tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
+        );
+        Mockito.verify(stateUpdater, never()).add(task00Converted);
+        Mockito.verify(stateUpdater).add(task01Converted);
+    }
+
     @Test
     public void shouldRecycleTasksRemovedFromStateUpdater() {
         final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)

Reply via email to