This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new e942823  KAFKA-10287: Skip unknown offsets when computing sum of 
changelog offsets (#9066)
e942823 is described below

commit e942823c86a27184726ae99a290fef2d3a5f2595
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Jul 28 17:59:26 2020 +0200

    KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets 
(#9066)
    
    In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets 
in checkpoint files. The sentinel was set to -2 which is the same value used 
for the sentinel LATEST_OFFSET that is used in subscriptions to signal that 
state stores have been used by an active task. Unfortunately, we missed to skip 
UNKNOWN_OFFSET when we compute the sum of the changelog offsets.
    
    If a task had only one state store and it did not restore anything before 
the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the 
subscription as sum of the changelog offsets. During assignment, the leader 
interpreted the -2 as if the stream run the task as active although it might 
have run it as standby. This misinterpretation of the sentinel value resulted 
in unexpected task assigments.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai 
<[email protected]>, John Roesler <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../streams/processor/internals/TaskManager.java   |  2 +-
 .../streams/state/internals/OffsetCheckpoint.java  |  6 +++--
 .../processor/internals/TaskManagerTest.java       | 30 ++++++++++++----------
 .../state/internals/OffsetCheckpointTest.java      |  5 ++--
 4 files changed, 25 insertions(+), 18 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index ae1dca5..7f019c6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -696,7 +696,7 @@ public class TaskManager {
                 // for this case, the offset of all partitions is set to 
`LATEST_OFFSET`
                 // and we "forward" the sentinel value directly
                 return Task.LATEST_OFFSET;
-            } else {
+            } else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
                 if (offset < 0) {
                     throw new IllegalStateException("Expected not to get a 
sentinel offset, but got: " + changelogEntry);
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index a4875fb..59afbb3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -59,8 +59,10 @@ public class OffsetCheckpoint {
     private static final int VERSION = 0;
 
     // Use a negative sentinel when we don't know the offset instead of 
skipping it to distinguish it from dirty state
-    // and use -2 as the -1 sentinel may be taken by some producer errors
-    public static final long OFFSET_UNKNOWN = -2;
+    // and use -4 as the -1 sentinel may be taken by some producer errors and 
-2 in the
+    // subscription means that the state is used by an active task and hence 
caught-up and
+    // -3 is also used in the subscription.
+    public static final long OFFSET_UNKNOWN = -4L;
 
     private final File file;
     private final Object lock;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index d7ea5ed..a7433fa3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -255,19 +255,7 @@ public class TaskManagerTest {
         );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
Task.LATEST_OFFSET));
 
-        expectLockObtainedFor(taskId00);
-        makeTaskFolders(taskId00.toString());
-        replay(stateDirectory);
-
-        taskManager.handleRebalanceStart(singleton("topic"));
-        final StateMachineTask runningTask = handleAssignment(
-            taskId00Assignment,
-            emptyMap(),
-            emptyMap()
-        ).get(taskId00);
-        runningTask.setChangelogOffsets(changelogOffsets);
-
-        assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+        computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
     }
 
     @Test
@@ -278,6 +266,22 @@ public class TaskManagerTest {
         );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
15L));
 
+        computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+    }
+
+    @Test
+    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws 
Exception {
+        final Map<TopicPartition, Long> changelogOffsets = mkMap(
+            mkEntry(new TopicPartition("changelog", 0), 
OffsetCheckpoint.OFFSET_UNKNOWN),
+            mkEntry(new TopicPartition("changelog", 1), 10L)
+        );
+        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
10L));
+
+        computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+    }
+
+    private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> 
changelogOffsets,
+                                           final Map<TaskId, Long> 
expectedOffsetSums) throws Exception {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
         replay(stateDirectory);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 0a1d874..fe871e1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -104,14 +104,15 @@ public class OffsetCheckpointTest {
     public void shouldReadAndWriteSentinelOffset() throws IOException {
         final File f = TestUtils.tempFile();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+        final long sentinelOffset = -4L;
 
         try {
             final Map<TopicPartition, Long> offsetsToWrite = new HashMap<>();
-            offsetsToWrite.put(new TopicPartition(topic, 1), -2L);
+            offsetsToWrite.put(new TopicPartition(topic, 1), sentinelOffset);
             checkpoint.write(offsetsToWrite);
 
             final Map<TopicPartition, Long> readOffsets = checkpoint.read();
-            assertThat(readOffsets.get(new TopicPartition(topic, 1)), 
equalTo(-2L));
+            assertThat(readOffsets.get(new TopicPartition(topic, 1)), 
equalTo(sentinelOffset));
         } finally {
             checkpoint.delete();
         }

Reply via email to