This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d04370bca7 KAFKA-10199: Fix restoration behavior for paused tasks 
(#14437)
2d04370bca7 is described below

commit 2d04370bca7ab5995371ce5501b2248c279e1d6f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 26 14:05:55 2023 +0200

    KAFKA-10199: Fix restoration behavior for paused tasks (#14437)
    
    State updater can get into a busy loop when all tasks are paused, because 
changelogReader will never return that all changelogs have been read 
completely. Fix this, by awaiting if updatingTasks is empty.
    
    Related and included: if we are restoring and all tasks are paused, we 
should return immediately from StoreChangelogReader.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../processor/internals/DefaultStateUpdater.java   | 14 ++++++++++--
 .../processor/internals/StoreChangelogReader.java  |  8 +++----
 .../internals/DefaultStateUpdaterTest.java         | 26 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index e37337d8ccf..c2872caee5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -73,6 +73,7 @@ public class DefaultStateUpdater implements StateUpdater {
         private final ChangelogReader changelogReader;
         private final StateUpdaterMetrics updaterMetrics;
         private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final AtomicBoolean isIdle = new AtomicBoolean(false);
         private final Map<TaskId, Task> updatingTasks = new 
ConcurrentHashMap<>();
         private final Map<TaskId, Task> pausedTasks = new 
ConcurrentHashMap<>();
 
@@ -307,10 +308,10 @@ public class DefaultStateUpdater implements StateUpdater {
             tasksAndActionsLock.lock();
             try {
                 while (isRunning.get() &&
-                    changelogReader.allChangelogsCompleted() &&
+                    (changelogReader.allChangelogsCompleted() || 
updatingTasks.isEmpty()) &&
                     tasksAndActions.isEmpty() &&
                     !isTopologyResumed.get()) {
-
+                    isIdle.set(true);
                     tasksAndActionsCondition.await();
                 }
             } catch (final InterruptedException ignored) {
@@ -318,6 +319,7 @@ public class DefaultStateUpdater implements StateUpdater {
                 // and hence this exception should never be thrown
             } finally {
                 tasksAndActionsLock.unlock();
+                isIdle.set(false);
             }
         }
 
@@ -768,6 +770,14 @@ public class DefaultStateUpdater implements StateUpdater {
         );
     }
 
+    // used for testing
+    boolean isIdle() {
+        if (stateUpdaterThread != null) {
+            return stateUpdaterThread.isIdle.get();
+        }
+        return false;
+    }
+
     private <T> Set<T> executeWithQueuesLocked(final Supplier<Set<T>> action) {
         tasksAndActionsLock.lock();
         restoredActiveTasksLock.lock();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 32427cb8dea..2a8c215e9b1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -808,15 +808,15 @@ public class StoreChangelogReader implements 
ChangelogReader {
 
     private void initializeChangelogs(final Map<TaskId, Task> tasks,
                                       final Set<ChangelogMetadata> 
newPartitionsToRestore) {
-        if (newPartitionsToRestore.isEmpty()) {
-            return;
-        }
-
         // for those changelog partitions whose tasks are not included, in 
means those tasks
         // are paused at the moment, and hence we should not try to initialize 
those
         // changelogs yet
         filterNewPartitionsToRestore(tasks, newPartitionsToRestore);
 
+        if (newPartitionsToRestore.isEmpty()) {
+            return;
+        }
+
         // for active changelogs, we need to find their end offset before 
transit to restoring
         // if the changelog is on source topic, then its end offset should be 
the minimum of
         // its committed offset and its end offset; for standby tasks that use 
source topics
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 05f68e78cd2..13ac8e48f10 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1043,6 +1043,24 @@ class DefaultStateUpdaterTest {
         verify(changelogReader, times(2)).enforceRestoreActive();
     }
 
+    @Test
+    public void shouldIdleWhenAllTasksPaused() throws Exception {
+        final StreamTask task = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+        verifyPausedTasks(task);
+        verifyIdle();
+
+        when(topologyMetadata.isPaused(null)).thenReturn(false);
+        stateUpdater.signalResume();
+
+        verifyPausedTasks();
+        verifyUpdatingTasks(task);
+    }
+
     @Test
     public void shouldResumeStandbyTask() throws Exception {
         final StandbyTask task = standbyTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
@@ -1767,6 +1785,14 @@ class DefaultStateUpdaterTest {
         }
     }
 
+    private void verifyIdle() throws Exception {
+        waitForCondition(
+            () -> stateUpdater.isIdle(),
+            VERIFICATION_TIMEOUT,
+            "State updater did not enter an idling state!"
+        );
+    }
+
     private void verifyPausedTasks(final Task... tasks) throws Exception {
         if (tasks.length == 0) {
             waitForCondition(

Reply via email to