This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0243bb98a7 HOTFIX: Revert KAFKA-10199 which is causing compilation 
failures (#12532)
0243bb98a7 is described below

commit 0243bb98a7cc6bc4ed3c8f39b3a1ed9f70d0f8bd
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Aug 17 14:29:49 2022 -0700

    HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532)
    
    Compilation is failing after these two commits:
    ```
    > Task :streams:compileJava
    
/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852:
 error: cannot find symbol
                            tasks.addPendingTaskToClose(restoringTask.id());
                                 ^
      symbol:   method 
addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
      location: variable tasks of type 
org.apache.kafka.streams.processor.internals.Tasks
    1 error
    ```
    
    Also here:
    ```
    
    [2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
    
    [2022-08-17T20:58:20.912Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822:
 error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in 
class TaskManagerTest
    
    [2022-08-17T20:58:20.912Z]     private TaskManager setupForRevocation(final 
Set<Task> tasksInStateUpdater,
    ```
     This patch reverts them.
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   |  42 +------
 .../kafka/streams/processor/internals/Tasks.java   |  19 +--
 .../processor/internals/TaskManagerTest.java       | 129 +--------------------
 3 files changed, 12 insertions(+), 178 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bab05a5184..03c36b0daf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
                 standbyTasksToCreate.remove(taskId);
             } else {
                 stateUpdater.remove(taskId);
-                tasks.addPendingTaskToCloseClean(taskId);
+                tasks.addPendingTaskToClose(taskId);
             }
         }
     }
@@ -692,7 +692,7 @@ public class TaskManager {
 
                     taskExceptions.putIfAbsent(taskId, e);
                 }
-            } else if (tasks.removePendingTaskToCloseClean(task.id())) {
+            } else if (tasks.removePendingTaskToClose(task.id())) {
                 try {
                     task.suspend();
                     task.closeClean();
@@ -710,8 +710,6 @@ public class TaskManager {
 
                     taskExceptions.putIfAbsent(task.id(), e);
                 }
-            } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
-                tasksToCloseDirty.add(task);
             } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
                 stateUpdater.add(task);
@@ -757,8 +755,6 @@ public class TaskManager {
             }
         }
 
-        removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
-
         if (!remainingRevokedPartitions.isEmpty()) {
             log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
                           + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
@@ -844,20 +840,6 @@ public class TaskManager {
         }
     }
 
-    private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> 
remainingRevokedPartitions) {
-        if (stateUpdater != null) {
-            for (final Task restoringTask : stateUpdater.getTasks()) {
-                if (restoringTask.isActive()) {
-                    if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToClose(restoringTask.id());
-                        stateUpdater.remove(restoringTask.id());
-                        
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
-                    }
-                }
-            }
-        }
-    }
-
     private void prepareCommitAndAddOffsetsToMap(final Set<Task> 
tasksToPrepare,
                                                  final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
         for (final Task task : tasksToPrepare) {
@@ -885,15 +867,6 @@ public class TaskManager {
     void handleLostAll() {
         log.debug("Closing lost active tasks as zombies.");
 
-        closeRunningTasksDirty();
-        removeLostTasksFromStateUpdater();
-
-        if (processingMode == EXACTLY_ONCE_V2) {
-            activeTaskCreator.reInitializeThreadProducer();
-        }
-    }
-
-    private void closeRunningTasksDirty() {
         final Set<Task> allTask = tasks.allTasks();
         for (final Task task : allTask) {
             // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -902,16 +875,9 @@ public class TaskManager {
                 closeTaskDirty(task);
             }
         }
-    }
 
-    private void removeLostTasksFromStateUpdater() {
-        if (stateUpdater != null) {
-            for (final Task restoringTask : stateUpdater.getTasks()) {
-                if (restoringTask.isActive()) {
-                    tasks.addPendingTaskToCloseDirty(restoringTask.id());
-                    stateUpdater.remove(restoringTask.id());
-                }
-            }
+        if (processingMode == EXACTLY_ONCE_V2) {
+            activeTaskCreator.reInitializeThreadProducer();
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 8178fe3691..9628b42d92 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -55,9 +55,7 @@ class Tasks {
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new 
HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
-    private final Set<TaskId> pendingTasksToCloseClean = new HashSet<>();
-
-    private final Set<TaskId> pendingTasksToCloseDirty = new HashSet<>();
+    private final Set<TaskId> pendingTasksToClose = new HashSet<>();
 
     // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new 
HashMap<>();
@@ -113,19 +111,12 @@ class Tasks {
         pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
     }
 
-    boolean removePendingTaskToCloseDirty(final TaskId taskId) {
-        return pendingTasksToCloseDirty.remove(taskId);
-    }
-    void addPendingTaskToCloseDirty(final TaskId taskId) {
-        pendingTasksToCloseDirty.add(taskId);
-    }
-
-    boolean removePendingTaskToCloseClean(final TaskId taskId) {
-        return pendingTasksToCloseClean.remove(taskId);
+    boolean removePendingTaskToClose(final TaskId taskId) {
+        return pendingTasksToClose.remove(taskId);
     }
 
-    void addPendingTaskToCloseClean(final TaskId taskId) {
-        pendingTasksToCloseClean.add(taskId);
+    void addPendingTaskToClose(final TaskId taskId) {
+        pendingTasksToClose.add(taskId);
     }
 
     Set<Task> drainPendingTaskToInit() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index ff52ad5ae9..12ea6477e5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -118,7 +118,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 
@@ -145,9 +144,7 @@ public class TaskManagerTest {
 
     private final TaskId taskId02 = new TaskId(0, 2);
     private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
-    private final TopicPartition t1p2changelog = new 
TopicPartition("changelog", 2);
     private final Set<TopicPartition> taskId02Partitions = mkSet(t1p2);
-    private final Set<TopicPartition> taskId02ChangelogPartitions = 
mkSet(t1p2changelog);
 
     private final TaskId taskId03 = new TaskId(0, 3);
     private final TopicPartition t1p3 = new TopicPartition(topic1, 3);
@@ -346,8 +343,8 @@ public class TaskManagerTest {
         expectLastCall().anyTimes();
         replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer);
 
-        taskManager.tasks().addPendingTaskToCloseClean(taskId00);
-        taskManager.tasks().addPendingTaskToCloseClean(taskId01);
+        taskManager.tasks().addPendingTaskToClose(taskId00);
+        taskManager.tasks().addPendingTaskToClose(taskId01);
 
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter 
-> { });
 
@@ -386,45 +383,6 @@ public class TaskManagerTest {
         Mockito.verify(stateUpdater).add(task01);
     }
 
-    @Test
-    public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
-        final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
-        final StandbyTask task2 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId01Partitions).build();
-        final StreamTask task3 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
-        final TaskManager taskManager = setupForRevocation(mkSet(task1, task2, 
task3), mkSet(task1, task3));
-
-        taskManager.handleLostAll();
-
-        Mockito.verify(stateUpdater).remove(task1.id());
-        Mockito.verify(stateUpdater).remove(task3.id());
-
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        Mockito.verify(task1).closeDirty();
-        Mockito.verify(task3).closeDirty();
-        Mockito.verify(task2, never()).closeDirty();
-        Mockito.verify(task2, never()).closeClean();
-    }
-
-    private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
-                                           final Set<Task> removedTasks) {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
-        when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
-        when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(consumer);
-
-        return taskManager;
-    }
-
     @Test
     public void shouldHandleRemovedTasksFromStateUpdater() {
         // tasks to recycle
@@ -478,7 +436,7 @@ public class TaskManagerTest {
             stateUpdater
         );
         taskManager.setMainConsumer(consumer);
-        taskManager.tasks().addPendingTaskToCloseClean(taskId02);
+        taskManager.tasks().addPendingTaskToClose(taskId02);
         taskManager.tasks().addPendingTaskToRecycle(taskId00, 
taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, 
taskId01Partitions);
         taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId03, 
taskId03Partitions);
@@ -751,87 +709,6 @@ public class TaskManagerTest {
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
     }
 
-    @Test
-    public void 
shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
-        final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
-        final TaskManager taskManager = setupForRevocation(mkSet(task), 
mkSet(task));
-
-        taskManager.handleRevocation(taskId00Partitions);
-
-        Mockito.verify(stateUpdater).remove(task.id());
-
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        Mockito.verify(task).closeClean();
-    }
-
-    public void 
shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
-        final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
-        final StreamTask task2 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId01Partitions).build();
-        final TaskManager taskManager = setupForRevocation(mkSet(task1, 
task2), mkSet(task1, task2));
-
-        taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, 
taskId01Partitions));
-
-        Mockito.verify(stateUpdater).remove(task1.id());
-        Mockito.verify(stateUpdater).remove(task2.id());
-
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        Mockito.verify(task1).closeClean();
-        Mockito.verify(task2).closeClean();
-    }
-
-    @Test
-    public void 
shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
-        final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
-        final TaskManager taskManager = setupForRevocation(mkSet(task), 
Collections.emptySet());
-
-        taskManager.handleRevocation(taskId01Partitions);
-
-        Mockito.verify(stateUpdater, never()).remove(task.id());
-
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        Mockito.verify(task, never()).closeClean();
-    }
-
-    @Test
-    public void shouldNotRemoveStandbyTaskFromStateUpdaterOnRevocation() {
-        final StandbyTask task = standbyTask(taskId00, 
taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
-        final TaskManager taskManager = setupForRevocation(mkSet(task), 
Collections.emptySet());
-
-        taskManager.handleRevocation(taskId00Partitions);
-
-        Mockito.verify(stateUpdater, never()).remove(task.id());
-
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        Mockito.verify(task, never()).closeClean();
-    }
-
-    private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
-                                           final Set<Task> removedTasks) {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
-        when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
-        when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(consumer);
-
-        return taskManager;
-    }
-
     @Test
     public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true);

Reply via email to