This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2d04370bca7 KAFKA-10199: Fix restoration behavior for paused tasks
(#14437)
2d04370bca7 is described below
commit 2d04370bca7ab5995371ce5501b2248c279e1d6f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 26 14:05:55 2023 +0200
KAFKA-10199: Fix restoration behavior for paused tasks (#14437)
State updater can get into a busy loop when all tasks are paused, because
changelogReader will never return that all changelogs have been read
completely. Fix this, by awaiting if updatingTasks is empty.
Related and included: if we are restoring and all tasks are paused, we
should return immediately from StoreChangelogReader.
Reviewer: Bruno Cadonna <[email protected]>
---
.../processor/internals/DefaultStateUpdater.java | 14 ++++++++++--
.../processor/internals/StoreChangelogReader.java | 8 +++----
.../internals/DefaultStateUpdaterTest.java | 26 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index e37337d8ccf..c2872caee5b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -73,6 +73,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final ChangelogReader changelogReader;
private final StateUpdaterMetrics updaterMetrics;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
+ private final AtomicBoolean isIdle = new AtomicBoolean(false);
private final Map<TaskId, Task> updatingTasks = new
ConcurrentHashMap<>();
private final Map<TaskId, Task> pausedTasks = new
ConcurrentHashMap<>();
@@ -307,10 +308,10 @@ public class DefaultStateUpdater implements StateUpdater {
tasksAndActionsLock.lock();
try {
while (isRunning.get() &&
- changelogReader.allChangelogsCompleted() &&
+ (changelogReader.allChangelogsCompleted() ||
updatingTasks.isEmpty()) &&
tasksAndActions.isEmpty() &&
!isTopologyResumed.get()) {
-
+ isIdle.set(true);
tasksAndActionsCondition.await();
}
} catch (final InterruptedException ignored) {
@@ -318,6 +319,7 @@ public class DefaultStateUpdater implements StateUpdater {
// and hence this exception should never be thrown
} finally {
tasksAndActionsLock.unlock();
+ isIdle.set(false);
}
}
@@ -768,6 +770,14 @@ public class DefaultStateUpdater implements StateUpdater {
);
}
+ // used for testing
+ boolean isIdle() {
+ if (stateUpdaterThread != null) {
+ return stateUpdaterThread.isIdle.get();
+ }
+ return false;
+ }
+
private <T> Set<T> executeWithQueuesLocked(final Supplier<Set<T>> action) {
tasksAndActionsLock.lock();
restoredActiveTasksLock.lock();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 32427cb8dea..2a8c215e9b1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -808,15 +808,15 @@ public class StoreChangelogReader implements
ChangelogReader {
private void initializeChangelogs(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata>
newPartitionsToRestore) {
- if (newPartitionsToRestore.isEmpty()) {
- return;
- }
-
// for those changelog partitions whose tasks are not included, in
means those tasks
// are paused at the moment, and hence we should not try to initialize
those
// changelogs yet
filterNewPartitionsToRestore(tasks, newPartitionsToRestore);
+ if (newPartitionsToRestore.isEmpty()) {
+ return;
+ }
+
// for active changelogs, we need to find their end offset before
transit to restoring
// if the changelog is on source topic, then its end offset should be
the minimum of
// its committed offset and its end offset; for standby tasks that use
source topics
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 05f68e78cd2..13ac8e48f10 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1043,6 +1043,24 @@ class DefaultStateUpdaterTest {
verify(changelogReader, times(2)).enforceRestoreActive();
}
+ @Test
+ public void shouldIdleWhenAllTasksPaused() throws Exception {
+ final StreamTask task = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+ verifyPausedTasks(task);
+ verifyIdle();
+
+ when(topologyMetadata.isPaused(null)).thenReturn(false);
+ stateUpdater.signalResume();
+
+ verifyPausedTasks();
+ verifyUpdatingTasks(task);
+ }
+
@Test
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
@@ -1767,6 +1785,14 @@ class DefaultStateUpdaterTest {
}
}
+ private void verifyIdle() throws Exception {
+ waitForCondition(
+ () -> stateUpdater.isIdle(),
+ VERIFICATION_TIMEOUT,
+ "State updater did not enter an idling state!"
+ );
+ }
+
private void verifyPausedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(