This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cef7ca13822fb8627ae93923ebbbca9a23872173 Author: A. Sophie Blee-Goldman <[email protected]> AuthorDate: Fri Oct 30 13:36:35 2020 -0700 KAFKA-10651: read offsets directly from checkpoint for uninitialized tasks (#9515) Read offsets directly from the checkpoint file if a task is uninitialized or closed Reviewers: Bruno Cadonna <[email protected]>, John Roesler <[email protected]> --- .../streams/processor/internals/TaskManager.java | 3 +- .../processor/internals/TaskManagerTest.java | 66 +++++++++++++++++++--- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 7f019c6..a1eb2fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -604,7 +604,8 @@ public class TaskManager { // just have an empty changelogOffsets map. for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) { final Task task = tasks.get(id); - if (task != null) { + // Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint + if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets(); if (changelogOffsets.isEmpty()) { log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index a7433fa3..2c8b740 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode; +import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.easymock.EasyMock; @@ -95,7 +96,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -339,6 +339,57 @@ public class TaskManagerTest { } @Test + public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception { + final Map<TopicPartition, Long> changelogOffsets = mkMap( + mkEntry(new TopicPartition("changelog", 0), 5L), + mkEntry(new TopicPartition("changelog", 1), 10L) + ); + final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L)); + + expectLockObtainedFor(taskId00); + makeTaskFolders(taskId00.toString()); + writeCheckpointFile(taskId00, changelogOffsets); + replay(stateDirectory); + + taskManager.handleRebalanceStart(singleton("topic")); + final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true); + expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask)); + replay(activeTaskCreator); + taskManager.handleAssignment(taskId00Assignment, emptyMap()); + + assertThat(uninitializedTask.state(), is(State.CREATED)); + + assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + } + + @Test + public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception { + final Map<TopicPartition, Long> changelogOffsets = mkMap( + mkEntry(new TopicPartition("changelog", 0), 5L), + mkEntry(new TopicPartition("changelog", 1), 10L) + ); + final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L)); + + expectLockObtainedFor(taskId00); + makeTaskFolders(taskId00.toString()); + writeCheckpointFile(taskId00, changelogOffsets); + replay(stateDirectory); + + final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true); + + taskManager.handleRebalanceStart(singleton("topic")); + expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(closedTask)); + replay(activeTaskCreator); + taskManager.handleAssignment(taskId00Assignment, emptyMap()); + + closedTask.suspend(); + closedTask.closeClean(); + assertThat(closedTask.state(), is(State.CLOSED)); + + assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + } + + @Test public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception { expectLockFailedFor(taskId00); makeTaskFolders(taskId00.toString()); @@ -2430,8 +2481,10 @@ public class TaskManagerTest { .map(t -> new StateMachineTask(t.getKey(), t.getValue(), false)) .collect(Collectors.toSet()); final Set<Task> restoringTasks = restoringActiveAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true)) - .collect(Collectors.toSet()); + .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true)) + .collect(Collectors.toSet()); + // give the restoring tasks some uncompleted changelog partitions so they'll stay in restoring + restoringTasks.forEach(t -> ((StateMachineTask) t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L))); // Initially assign only the active tasks we want to complete restoration final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new HashMap<>(runningActiveAssignment); @@ -2439,17 +2492,14 @@ public class TaskManagerTest { final Set<Task> allActiveTasks = new HashSet<>(runningTasks); allActiveTasks.addAll(restoringTasks); - expect(activeTaskCreator.createTasks(anyObject(), eq(runningActiveAssignment))).andStubReturn(runningTasks); expect(standbyTaskCreator.createTasks(eq(standbyAssignment))).andStubReturn(standbyTasks); expect(activeTaskCreator.createTasks(anyObject(), eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks); expectRestoreToBeCompleted(consumer, changeLogReader); replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); - taskManager.handleAssignment(runningActiveAssignment, standbyAssignment); - assertThat(taskManager.tryToCompleteRestoration(), is(true)); - taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment); + taskManager.tryToCompleteRestoration(); final Map<TaskId, StateMachineTask> allTasks = new HashMap<>(); @@ -2459,7 +2509,7 @@ public class TaskManagerTest { allTasks.put(task.id(), (StateMachineTask) task); } for (final Task task : restoringTasks) { - assertThat(task.state(), not(Task.State.RUNNING)); + assertThat(task.state(), is(Task.State.RESTORING)); allTasks.put(task.id(), (StateMachineTask) task); } for (final Task task : standbyTasks) {
