This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 90e4e1e KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)
90e4e1e is described below
commit 90e4e1e61614d1c55c61b56861eac0a1d715485b
Author: Boyang Chen <[email protected]>
AuthorDate: Thu Oct 31 17:14:29 2019 -0700
KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)
This is a typo bug which is due to calling a wrong map.
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang
<[email protected]>
---
.../processor/internals/AssignedStreamsTasks.java | 2 +-
.../internals/AssignedStreamsTasksTest.java | 143 +++++++++++++++++++++
2 files changed, 144 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 63dac25..161714e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -281,7 +281,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
firstException.compareAndSet(null, closeNonRunning(true,
created.get(id), lostTaskChangelogs));
} else if (restoring.containsKey(id)) {
log.debug("Closing the zombie restoring stream task {}.", id);
- firstException.compareAndSet(null, closeRestoring(true,
created.get(id), lostTaskChangelogs));
+ firstException.compareAndSet(null, closeRestoring(true,
restoring.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true,
running.get(id), lostTaskChangelogs));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 68ca9bd..42dc58b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.function.ThrowingRunnable;
@@ -49,6 +50,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
@@ -517,6 +519,147 @@ public class AssignedStreamsTasksTest {
assignedTasks.shutdown(true);
}
+ @Test
+ public void shouldClearZombieCreatedTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ task.close(false, true);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.addNewTask(task);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.created.keySet();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @Test
+ public void shouldClearZombieRestoringTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.closeStateManager(false);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.addTaskToRestoring(task);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.restoringTaskIds();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @Test
+ public void shouldClearZombieRunningTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ task.initializeTopology();
+
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.close(false, true);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.transitionToRunning(task);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.runningTaskIds();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @Test
+ public void shouldClearZombieSuspendedTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ task.initializeTopology();
+
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.suspend();
+ task.closeSuspended(false, null);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.transitionToRunning(task);
+ final List<TopicPartition> revokedChangelogs = new
ArrayList<>();
+ final List<TaskId> ids = Collections.singletonList(task.id());
+ assignedTasks.suspendOrCloseTasks(new HashSet<>(ids),
revokedChangelogs);
+ assertEquals(clearingPartitions, revokedChangelogs);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.suspendedTaskIds();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return Collections.emptyList();
+ }
+ }.createTaskAndClear();
+ }
+
+ abstract class TaskTestSuite {
+
+ TaskId clearingTaskId = new TaskId(0, 0);
+ List<TopicPartition> clearingPartitions =
Collections.singletonList(new TopicPartition("topic", 0));
+
+ abstract void additionalSetup(final StreamTask task);
+
+ abstract void action(final StreamTask task);
+
+ abstract Set<TaskId> taskIds();
+
+ abstract List<TopicPartition> expectedLostChangelogs();
+
+ void createTaskAndClear() {
+ final StreamTask task = EasyMock.createMock(StreamTask.class);
+ EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
+
EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
+ additionalSetup(task);
+ EasyMock.replay(task);
+
+ action(task);
+ final List<TopicPartition> changelogs = new ArrayList<>();
+ final Set<TaskId> ids = new
HashSet<>(Collections.singleton(task.id()));
+ assertEquals(ids, taskIds());
+
+ assignedTasks.closeZombieTasks(ids, changelogs);
+ assertEquals(Collections.emptySet(), taskIds());
+ assertEquals(expectedLostChangelogs(), changelogs);
+ }
+ }
+
private void addAndInitTask() {
assignedTasks.addNewTask(t1);
assignedTasks.initializeNewTasks();