This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cef7ca13822fb8627ae93923ebbbca9a23872173
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Oct 30 13:36:35 2020 -0700

    KAFKA-10651: read  offsets directly from checkpoint for uninitialized tasks 
(#9515)
    
    Read offsets directly from the checkpoint file if a task is uninitialized 
or closed
    
    Reviewers: Bruno Cadonna <[email protected]>, John Roesler 
<[email protected]>
---
 .../streams/processor/internals/TaskManager.java   |  3 +-
 .../processor/internals/TaskManagerTest.java       | 66 +++++++++++++++++++---
 2 files changed, 60 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7f019c6..a1eb2fa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -604,7 +604,8 @@ public class TaskManager {
         // just have an empty changelogOffsets map.
         for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.keySet())) {
             final Task task = tasks.get(id);
-            if (task != null) {
+            // Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
+            if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
                 final Map<TopicPartition, Long> changelogOffsets = 
task.changelogOffsets();
                 if (changelogOffsets.isEmpty()) {
                     log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index a7433fa3..2c8b740 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode;
+import org.apache.kafka.streams.processor.internals.Task.State;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.easymock.EasyMock;
@@ -95,7 +96,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -339,6 +339,57 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() 
throws Exception {
+        final Map<TopicPartition, Long> changelogOffsets = mkMap(
+            mkEntry(new TopicPartition("changelog", 0), 5L),
+            mkEntry(new TopicPartition("changelog", 1), 10L)
+        );
+        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
15L));
+
+        expectLockObtainedFor(taskId00);
+        makeTaskFolders(taskId00.toString());
+        writeCheckpointFile(taskId00, changelogOffsets);
+        replay(stateDirectory);
+
+        taskManager.handleRebalanceStart(singleton("topic"));
+        final StateMachineTask uninitializedTask = new 
StateMachineTask(taskId00, taskId00Partitions, true);
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
+        replay(activeTaskCreator);
+        taskManager.handleAssignment(taskId00Assignment, emptyMap());
+
+        assertThat(uninitializedTask.state(), is(State.CREATED));
+
+        assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+    }
+
+    @Test
+    public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws 
Exception {
+        final Map<TopicPartition, Long> changelogOffsets = mkMap(
+            mkEntry(new TopicPartition("changelog", 0), 5L),
+            mkEntry(new TopicPartition("changelog", 1), 10L)
+        );
+        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
15L));
+
+        expectLockObtainedFor(taskId00);
+        makeTaskFolders(taskId00.toString());
+        writeCheckpointFile(taskId00, changelogOffsets);
+        replay(stateDirectory);
+
+        final StateMachineTask closedTask = new StateMachineTask(taskId00, 
taskId00Partitions, true);
+
+        taskManager.handleRebalanceStart(singleton("topic"));
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singleton(closedTask));
+        replay(activeTaskCreator);
+        taskManager.handleAssignment(taskId00Assignment, emptyMap());
+
+        closedTask.suspend();
+        closedTask.closeClean();
+        assertThat(closedTask.state(), is(State.CLOSED));
+
+        assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+    }
+    
+    @Test
     public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
         expectLockFailedFor(taskId00);
         makeTaskFolders(taskId00.toString());
@@ -2430,8 +2481,10 @@ public class TaskManagerTest {
                                            .map(t -> new 
StateMachineTask(t.getKey(), t.getValue(), false))
                                            .collect(Collectors.toSet());
         final Set<Task> restoringTasks = 
restoringActiveAssignment.entrySet().stream()
-                                             .map(t -> new 
StateMachineTask(t.getKey(), t.getValue(), true))
-                                             .collect(Collectors.toSet());
+                                           .map(t -> new 
StateMachineTask(t.getKey(), t.getValue(), true))
+                                           .collect(Collectors.toSet());
+        // give the restoring tasks some uncompleted changelog partitions so 
they'll stay in restoring
+        restoringTasks.forEach(t -> ((StateMachineTask) 
t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L)));
 
         // Initially assign only the active tasks we want to complete 
restoration
         final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new 
HashMap<>(runningActiveAssignment);
@@ -2439,17 +2492,14 @@ public class TaskManagerTest {
         final Set<Task> allActiveTasks = new HashSet<>(runningTasks);
         allActiveTasks.addAll(restoringTasks);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(runningActiveAssignment))).andStubReturn(runningTasks);
         
expect(standbyTaskCreator.createTasks(eq(standbyAssignment))).andStubReturn(standbyTasks);
         expect(activeTaskCreator.createTasks(anyObject(), 
eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks);
 
         expectRestoreToBeCompleted(consumer, changeLogReader);
         replay(activeTaskCreator, standbyTaskCreator, consumer, 
changeLogReader);
 
-        taskManager.handleAssignment(runningActiveAssignment, 
standbyAssignment);
-        assertThat(taskManager.tryToCompleteRestoration(), is(true));
-
         taskManager.handleAssignment(allActiveTasksAssignment, 
standbyAssignment);
+        taskManager.tryToCompleteRestoration();
 
         final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
 
@@ -2459,7 +2509,7 @@ public class TaskManagerTest {
             allTasks.put(task.id(), (StateMachineTask) task);
         }
         for (final Task task : restoringTasks) {
-            assertThat(task.state(), not(Task.State.RUNNING));
+            assertThat(task.state(), is(Task.State.RESTORING));
             allTasks.put(task.id(), (StateMachineTask) task);
         }
         for (final Task task : standbyTasks) {

Reply via email to