This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new e942823 KAFKA-10287: Skip unknown offsets when computing sum of
changelog offsets (#9066)
e942823 is described below
commit e942823c86a27184726ae99a290fef2d3a5f2595
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Jul 28 17:59:26 2020 +0200
KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
(#9066)
In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets
in checkpoint files. The sentinel was set to -2 which is the same value used
for the sentinel LATEST_OFFSET that is used in subscriptions to signal that
state stores have been used by an active task. Unfortunately, we missed to skip
UNKNOWN_OFFSET when we compute the sum of the changelog offsets.
If a task had only one state store and it did not restore anything before
the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the
subscription as sum of the changelog offsets. During assignment, the leader
interpreted the -2 as if the stream run the task as active although it might
have run it as standby. This misinterpretation of the sentinel value resulted
in unexpected task assigments.
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai
<[email protected]>, John Roesler <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/TaskManager.java | 2 +-
.../streams/state/internals/OffsetCheckpoint.java | 6 +++--
.../processor/internals/TaskManagerTest.java | 30 ++++++++++++----------
.../state/internals/OffsetCheckpointTest.java | 5 ++--
4 files changed, 25 insertions(+), 18 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index ae1dca5..7f019c6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -696,7 +696,7 @@ public class TaskManager {
// for this case, the offset of all partitions is set to
`LATEST_OFFSET`
// and we "forward" the sentinel value directly
return Task.LATEST_OFFSET;
- } else {
+ } else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
if (offset < 0) {
throw new IllegalStateException("Expected not to get a
sentinel offset, but got: " + changelogEntry);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index a4875fb..59afbb3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -59,8 +59,10 @@ public class OffsetCheckpoint {
private static final int VERSION = 0;
// Use a negative sentinel when we don't know the offset instead of
skipping it to distinguish it from dirty state
- // and use -2 as the -1 sentinel may be taken by some producer errors
- public static final long OFFSET_UNKNOWN = -2;
+ // and use -4 as the -1 sentinel may be taken by some producer errors and
-2 in the
+ // subscription means that the state is used by an active task and hence
caught-up and
+ // -3 is also used in the subscription.
+ public static final long OFFSET_UNKNOWN = -4L;
private final File file;
private final Object lock;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index d7ea5ed..a7433fa3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -255,19 +255,7 @@ public class TaskManagerTest {
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
Task.LATEST_OFFSET));
- expectLockObtainedFor(taskId00);
- makeTaskFolders(taskId00.toString());
- replay(stateDirectory);
-
- taskManager.handleRebalanceStart(singleton("topic"));
- final StateMachineTask runningTask = handleAssignment(
- taskId00Assignment,
- emptyMap(),
- emptyMap()
- ).get(taskId00);
- runningTask.setChangelogOffsets(changelogOffsets);
-
- assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+ computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
@Test
@@ -278,6 +266,22 @@ public class TaskManagerTest {
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
15L));
+ computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+ }
+
+ @Test
+ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws
Exception {
+ final Map<TopicPartition, Long> changelogOffsets = mkMap(
+ mkEntry(new TopicPartition("changelog", 0),
OffsetCheckpoint.OFFSET_UNKNOWN),
+ mkEntry(new TopicPartition("changelog", 1), 10L)
+ );
+ final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
10L));
+
+ computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+ }
+
+ private void computeOffsetSumAndVerify(final Map<TopicPartition, Long>
changelogOffsets,
+ final Map<TaskId, Long>
expectedOffsetSums) throws Exception {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
replay(stateDirectory);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 0a1d874..fe871e1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -104,14 +104,15 @@ public class OffsetCheckpointTest {
public void shouldReadAndWriteSentinelOffset() throws IOException {
final File f = TestUtils.tempFile();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+ final long sentinelOffset = -4L;
try {
final Map<TopicPartition, Long> offsetsToWrite = new HashMap<>();
- offsetsToWrite.put(new TopicPartition(topic, 1), -2L);
+ offsetsToWrite.put(new TopicPartition(topic, 1), sentinelOffset);
checkpoint.write(offsetsToWrite);
final Map<TopicPartition, Long> readOffsets = checkpoint.read();
- assertThat(readOffsets.get(new TopicPartition(topic, 1)),
equalTo(-2L));
+ assertThat(readOffsets.get(new TopicPartition(topic, 1)),
equalTo(sentinelOffset));
} finally {
checkpoint.delete();
}