This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0243bb98a7 HOTFIX: Revert KAFKA-10199 which is causing compilation
failures (#12532)
0243bb98a7 is described below
commit 0243bb98a7cc6bc4ed3c8f39b3a1ed9f70d0f8bd
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Aug 17 14:29:49 2022 -0700
HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532)
Compilation is failing after these two commits:
```
> Task :streams:compileJava
/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852:
error: cannot find symbol
tasks.addPendingTaskToClose(restoringTask.id());
^
symbol: method
addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
location: variable tasks of type
org.apache.kafka.streams.processor.internals.Tasks
1 error
```
Also here:
```
[2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
[2022-08-17T20:58:20.912Z]
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822:
error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in
class TaskManagerTest
[2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final
Set<Task> tasksInStateUpdater,
```
This patch reverts them.
Reviewers: Ismael Juma <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 42 +------
.../kafka/streams/processor/internals/Tasks.java | 19 +--
.../processor/internals/TaskManagerTest.java | 129 +--------------------
3 files changed, 12 insertions(+), 178 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bab05a5184..03c36b0daf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
standbyTasksToCreate.remove(taskId);
} else {
stateUpdater.remove(taskId);
- tasks.addPendingTaskToCloseClean(taskId);
+ tasks.addPendingTaskToClose(taskId);
}
}
}
@@ -692,7 +692,7 @@ public class TaskManager {
taskExceptions.putIfAbsent(taskId, e);
}
- } else if (tasks.removePendingTaskToCloseClean(task.id())) {
+ } else if (tasks.removePendingTaskToClose(task.id())) {
try {
task.suspend();
task.closeClean();
@@ -710,8 +710,6 @@ public class TaskManager {
taskExceptions.putIfAbsent(task.id(), e);
}
- } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
- tasksToCloseDirty.add(task);
} else if ((inputPartitions =
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions,
topologyMetadata.nodeToSourceTopics(task.id()));
stateUpdater.add(task);
@@ -757,8 +755,6 @@ public class TaskManager {
}
}
- removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
-
if (!remainingRevokedPartitions.isEmpty()) {
log.debug("The following revoked partitions {} are missing from
the current task partitions. It could "
+ "potentially be due to race condition of consumer
detecting the heartbeat failure, or the tasks " +
@@ -844,20 +840,6 @@ public class TaskManager {
}
}
- private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition>
remainingRevokedPartitions) {
- if (stateUpdater != null) {
- for (final Task restoringTask : stateUpdater.getTasks()) {
- if (restoringTask.isActive()) {
- if
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
- tasks.addPendingTaskToClose(restoringTask.id());
- stateUpdater.remove(restoringTask.id());
-
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
- }
- }
- }
- }
- }
-
private void prepareCommitAndAddOffsetsToMap(final Set<Task>
tasksToPrepare,
final Map<Task,
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
for (final Task task : tasksToPrepare) {
@@ -885,15 +867,6 @@ public class TaskManager {
void handleLostAll() {
log.debug("Closing lost active tasks as zombies.");
- closeRunningTasksDirty();
- removeLostTasksFromStateUpdater();
-
- if (processingMode == EXACTLY_ONCE_V2) {
- activeTaskCreator.reInitializeThreadProducer();
- }
- }
-
- private void closeRunningTasksDirty() {
final Set<Task> allTask = tasks.allTasks();
for (final Task task : allTask) {
// Even though we've apparently dropped out of the group, we can
continue safely to maintain our
@@ -902,16 +875,9 @@ public class TaskManager {
closeTaskDirty(task);
}
}
- }
- private void removeLostTasksFromStateUpdater() {
- if (stateUpdater != null) {
- for (final Task restoringTask : stateUpdater.getTasks()) {
- if (restoringTask.isActive()) {
- tasks.addPendingTaskToCloseDirty(restoringTask.id());
- stateUpdater.remove(restoringTask.id());
- }
- }
+ if (processingMode == EXACTLY_ONCE_V2) {
+ activeTaskCreator.reInitializeThreadProducer();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 8178fe3691..9628b42d92 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -55,9 +55,7 @@ class Tasks {
private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new
HashMap<>();
private final Map<TaskId, Set<TopicPartition>>
pendingTasksToUpdateInputPartitions = new HashMap<>();
private final Set<Task> pendingTasksToInit = new HashSet<>();
- private final Set<TaskId> pendingTasksToCloseClean = new HashSet<>();
-
- private final Set<TaskId> pendingTasksToCloseDirty = new HashSet<>();
+ private final Set<TaskId> pendingTasksToClose = new HashSet<>();
// TODO: convert to Stream/StandbyTask when we remove
TaskManager#StateMachineTask with mocks
private final Map<TopicPartition, Task> activeTasksPerPartition = new
HashMap<>();
@@ -113,19 +111,12 @@ class Tasks {
pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
}
- boolean removePendingTaskToCloseDirty(final TaskId taskId) {
- return pendingTasksToCloseDirty.remove(taskId);
- }
- void addPendingTaskToCloseDirty(final TaskId taskId) {
- pendingTasksToCloseDirty.add(taskId);
- }
-
- boolean removePendingTaskToCloseClean(final TaskId taskId) {
- return pendingTasksToCloseClean.remove(taskId);
+ boolean removePendingTaskToClose(final TaskId taskId) {
+ return pendingTasksToClose.remove(taskId);
}
- void addPendingTaskToCloseClean(final TaskId taskId) {
- pendingTasksToCloseClean.add(taskId);
+ void addPendingTaskToClose(final TaskId taskId) {
+ pendingTasksToClose.add(taskId);
}
Set<Task> drainPendingTaskToInit() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index ff52ad5ae9..12ea6477e5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -118,7 +118,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
@@ -145,9 +144,7 @@ public class TaskManagerTest {
private final TaskId taskId02 = new TaskId(0, 2);
private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
- private final TopicPartition t1p2changelog = new
TopicPartition("changelog", 2);
private final Set<TopicPartition> taskId02Partitions = mkSet(t1p2);
- private final Set<TopicPartition> taskId02ChangelogPartitions =
mkSet(t1p2changelog);
private final TaskId taskId03 = new TaskId(0, 3);
private final TopicPartition t1p3 = new TopicPartition(topic1, 3);
@@ -346,8 +343,8 @@ public class TaskManagerTest {
expectLastCall().anyTimes();
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder,
consumer);
- taskManager.tasks().addPendingTaskToCloseClean(taskId00);
- taskManager.tasks().addPendingTaskToCloseClean(taskId01);
+ taskManager.tasks().addPendingTaskToClose(taskId00);
+ taskManager.tasks().addPendingTaskToClose(taskId01);
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter
-> { });
@@ -386,45 +383,6 @@ public class TaskManagerTest {
Mockito.verify(stateUpdater).add(task01);
}
- @Test
- public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
- final StreamTask task1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
- final StandbyTask task2 = standbyTask(taskId01,
taskId01ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId01Partitions).build();
- final StreamTask task3 = statefulTask(taskId02,
taskId02ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId02Partitions).build();
- final TaskManager taskManager = setupForRevocation(mkSet(task1, task2,
task3), mkSet(task1, task3));
-
- taskManager.handleLostAll();
-
- Mockito.verify(stateUpdater).remove(task1.id());
- Mockito.verify(stateUpdater).remove(task3.id());
-
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- Mockito.verify(task1).closeDirty();
- Mockito.verify(task3).closeDirty();
- Mockito.verify(task2, never()).closeDirty();
- Mockito.verify(task2, never()).closeClean();
- }
-
- private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
- final Set<Task> removedTasks) {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
- when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
- when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
- expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
- consumer.resume(anyObject());
- expectLastCall().anyTimes();
- replay(consumer);
-
- return taskManager;
- }
-
@Test
public void shouldHandleRemovedTasksFromStateUpdater() {
// tasks to recycle
@@ -478,7 +436,7 @@ public class TaskManagerTest {
stateUpdater
);
taskManager.setMainConsumer(consumer);
- taskManager.tasks().addPendingTaskToCloseClean(taskId02);
+ taskManager.tasks().addPendingTaskToClose(taskId02);
taskManager.tasks().addPendingTaskToRecycle(taskId00,
taskId00Partitions);
taskManager.tasks().addPendingTaskToRecycle(taskId01,
taskId01Partitions);
taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId03,
taskId03Partitions);
@@ -751,87 +709,6 @@ public class TaskManagerTest {
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
- @Test
- public void
shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
{
- final StreamTask task = statefulTask(taskId00,
taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
- final TaskManager taskManager = setupForRevocation(mkSet(task),
mkSet(task));
-
- taskManager.handleRevocation(taskId00Partitions);
-
- Mockito.verify(stateUpdater).remove(task.id());
-
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- Mockito.verify(task).closeClean();
- }
-
- public void
shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
{
- final StreamTask task1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
- final StreamTask task2 = statefulTask(taskId01,
taskId01ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId01Partitions).build();
- final TaskManager taskManager = setupForRevocation(mkSet(task1,
task2), mkSet(task1, task2));
-
- taskManager.handleRevocation(union(HashSet::new, taskId00Partitions,
taskId01Partitions));
-
- Mockito.verify(stateUpdater).remove(task1.id());
- Mockito.verify(stateUpdater).remove(task2.id());
-
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- Mockito.verify(task1).closeClean();
- Mockito.verify(task2).closeClean();
- }
-
- @Test
- public void
shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation()
{
- final StreamTask task = statefulTask(taskId00,
taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
- final TaskManager taskManager = setupForRevocation(mkSet(task),
Collections.emptySet());
-
- taskManager.handleRevocation(taskId01Partitions);
-
- Mockito.verify(stateUpdater, never()).remove(task.id());
-
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- Mockito.verify(task, never()).closeClean();
- }
-
- @Test
- public void shouldNotRemoveStandbyTaskFromStateUpdaterOnRevocation() {
- final StandbyTask task = standbyTask(taskId00,
taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
- final TaskManager taskManager = setupForRevocation(mkSet(task),
Collections.emptySet());
-
- taskManager.handleRevocation(taskId00Partitions);
-
- Mockito.verify(stateUpdater, never()).remove(task.id());
-
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- Mockito.verify(task, never()).closeClean();
- }
-
- private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
- final Set<Task> removedTasks) {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
- when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
- when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
- expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
- consumer.resume(anyObject());
- expectLastCall().anyTimes();
- replay(consumer);
-
- return taskManager;
- }
-
@Test
public void
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true);