This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7f730d9d9a MINOR: Only commit running active and standby tasks when 
tasks corrupted (#14508)
c7f730d9d9a is described below

commit c7f730d9d9a95ce43b19a8af06e9f5be5e25609b
Author: Bruno Cadonna <[email protected]>
AuthorDate: Thu Oct 12 13:24:54 2023 +0200

    MINOR: Only commit running active and standby tasks when tasks corrupted 
(#14508)
    
    When tasks are found corrupted, Kafka Streams tries to commit
    the non-corrupted tasks before closing and reviving the corrupted
    active tasks. Besides active running tasks, Kafka Streams tries
    to commit restoring active tasks and standby tasks. However,
    restoring active tasks do not need to be committed since they
    do not have offsets to commit and the current code does not
    write a checkpoint. Furthermore, trying to commit restoring
    active tasks with the state updater enabled results in the
    following error:
    
    java.lang.UnsupportedOperationException: This task is read-only
    at 
org.apache.kafka.streams.processor.internals.ReadOnlyTask.commitNeeded(ReadOnlyTask.java:209)
    ...
    
    since commitNeeded() is not a read-only method for active tasks.
    
    In future, we can consider writing a checkpoint for active
    restoring tasks in this situation. Additionally, we should
    fix commitNeeded() in active tasks to be read-only.
    
    Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../streams/processor/internals/ReadOnlyTask.java  |  5 ++++-
 .../streams/processor/internals/TaskManager.java   |  5 +----
 .../processor/internals/ReadOnlyTaskTest.java      | 26 ++++++++++++++++++++++
 .../processor/internals/TaskManagerTest.java       | 26 ++++++++++++++++++++++
 4 files changed, 57 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index cfac1f13b21..fd91b6e282b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -206,7 +206,10 @@ public class ReadOnlyTask implements Task {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        if (task.isActive()) {
+            throw new UnsupportedOperationException("This task is read-only");
+        }
+        return task.commitNeeded();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index cf6f6b6326f..42da481a8c4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -223,10 +223,7 @@ public class TaskManager {
             final Collection<Task> tasksToCommit = allTasks()
                 .values()
                 .stream()
-                // TODO: once we remove state restoration from the stream 
thread, we can also remove
-                //  the RESTORING state here, since there will not be any 
restoring tasks managed
-                //  by the stream thread anymore.
-                .filter(t -> t.state() == Task.State.RUNNING || t.state() == 
Task.State.RESTORING)
+                .filter(t -> t.state() == Task.State.RUNNING)
                 .filter(t -> !corruptedTasks.contains(t.id()))
                 .collect(Collectors.toSet());
             commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, new 
HashMap<>());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
index e0c8eaf4056..028a041f0cc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -28,6 +28,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -41,6 +44,7 @@ class ReadOnlyTaskTest {
             add("inputPartitions");
             add("changelogPartitions");
             add("commitRequested");
+            add("commitNeeded");
             add("isActive");
             add("changelogOffsets");
             add("state");
@@ -126,6 +130,28 @@ class ReadOnlyTaskTest {
         verify(task).state();
     }
 
+    @Test
+    public void shouldDelegateCommitNeededIfStandby() {
+        final StandbyTask standbyTask =
+            standbyTask(new TaskId(1, 0), mkSet(new TopicPartition("topic", 
0))).build();
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(standbyTask);
+
+        readOnlyTask.commitNeeded();
+
+        verify(standbyTask).commitNeeded();
+    }
+
+    @Test
+    public void 
shouldThrowUnsupportedOperationExceptionForCommitNeededIfActive() {
+        final StreamTask statefulTask =
+            statefulTask(new TaskId(1, 0), mkSet(new TopicPartition("topic", 
0))).build();
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(statefulTask);
+
+        final Exception exception = 
assertThrows(UnsupportedOperationException.class, readOnlyTask::commitNeeded);
+
+        assertEquals("This task is read-only", exception.getMessage());
+    }
+
     @Test
     public void shouldThrowUnsupportedOperationExceptionForForbiddenMethods() {
         final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index a876d8ab4e1..85ab9c68688 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -95,6 +95,7 @@ import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.intersection;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -2330,6 +2331,31 @@ public class TaskManagerTest {
         
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
+    @Test
+    public void 
shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitStandbyTasks() {
+        final StreamTask activeRestoringTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING).build();
+        final StreamTask corruptedTask = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .withInputPartitions(taskId02Partitions)
+            .inState(State.RUNNING).build();
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
+        when(tasks.task(taskId02)).thenReturn(corruptedTask);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        when(stateUpdater.getTasks()).thenReturn(mkSet(activeRestoringTask, 
standbyTask));
+        expect(consumer.assignment()).andReturn(intersection(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
+        replay(consumer);
+
+        taskManager.handleCorruption(mkSet(taskId02));
+
+        Mockito.verify(activeRestoringTask, never()).commitNeeded();
+        Mockito.verify(standbyTask, times(2)).commitNeeded();
+    }
+
     @Test
     public void 
shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
         final ProcessorStateManager stateManager = 
Mockito.mock(ProcessorStateManager.class);

Reply via email to