This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdf68a4  KAFKA-10166: checkpoint recycled standbys and ignore empty 
rocksdb base directory (#8962)
cdf68a4 is described below

commit cdf68a4dae284ef021ae4ed26c5e0128c0cd7224
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Mon Jul 6 17:16:12 2020 -0700

    KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base 
directory (#8962)
    
    Two more edge cases I found producing extra TaskcorruptedException while 
playing around with the failing eos-beta upgrade test (sadly these are 
unrelated problems, as the test still fails with these fixes in place).
    
    * Need to write the checkpoint when recycling a standby: although we do 
preserve the changelog offsets when recycling a task, and should therefore 
write the offsets when the new task is itself closed, we do NOT write the 
checkpoint for uninitialized tasks. So if the new task is ultimately closed 
before it gets out of the CREATED state, the offsets will not be written and we 
can get a TaskCorruptedException
    * We do not write the checkpoint file if the current offset map is empty; 
however for eos the checkpoint file is not only used for restoration but also 
for clean shutdown. Although skipping a dummy checkpoint file does not actually 
violate any correctness since we are going to re-bootstrap from the 
log-start-offset anyways, it throws unnecessary TaskCorruptedException which 
has an overhead itself.
    
    Reviewers: John Roesler <vvcep...@apache.org>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../processor/internals/ProcessorStateManager.java | 18 ++++++++++++---
 .../streams/processor/internals/TaskManager.java   | 23 ++++++++++++++----
 .../streams/state/internals/OffsetCheckpoint.java  | 15 +++++++++---
 .../state/internals/OffsetCheckpointTest.java      | 27 +++++++++++++++++++---
 4 files changed, 70 insertions(+), 13 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index fefbfbc..953a4e8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -47,6 +47,7 @@ import static java.lang.String.format;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
 import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
 
 /**
  * ProcessorStateManager is the source of truth for the current offset for 
each state store,
@@ -225,7 +226,8 @@ public class ProcessorStateManager implements StateManager {
                     log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
                 } else if (store.offset() == null) {
                     if 
(loadedCheckpoints.containsKey(store.changelogPartition)) {
-                        
store.setOffset(loadedCheckpoints.remove(store.changelogPartition));
+                        final Long offset = 
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
+                        store.setOffset(offset);
 
                         log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
                                   store.stateStore.name(), store.offset, 
store.changelogPartition);
@@ -538,10 +540,10 @@ public class ProcessorStateManager implements 
StateManager {
             // store is logged, persistent, not corrupted, and has a valid 
current offset
             if (storeMetadata.changelogPartition != null &&
                 storeMetadata.stateStore.persistent() &&
-                storeMetadata.offset != null &&
                 !storeMetadata.corrupted) {
 
-                checkpointingOffsets.put(storeMetadata.changelogPartition, 
storeMetadata.offset);
+                final long checkpointableOffset = 
checkpointableOffsetFromChangelogOffset(storeMetadata.offset);
+                checkpointingOffsets.put(storeMetadata.changelogPartition, 
checkpointableOffset);
             }
         }
 
@@ -578,4 +580,14 @@ public class ProcessorStateManager implements StateManager 
{
 
         return found.isEmpty() ? null : found.get(0);
     }
+
+    // Pass in a sentinel value to checkpoint when the changelog offset is not 
yet initialized/known
+    private long checkpointableOffsetFromChangelogOffset(final Long offset) {
+        return offset != null ? offset : OFFSET_UNKNOWN;
+    }
+
+    // Convert the written offsets in the checkpoint file back to the 
changelog offset
+    private Long changelogOffsetFromCheckpointedOffset(final long offset) {
+        return offset != OFFSET_UNKNOWN ? offset : null;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 173efff..c52ebdf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.State;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
 
@@ -242,9 +243,14 @@ public class TaskManager {
 
         for (final Task task : tasksToClose) {
             try {
-                if (!task.isActive()) {
-                    // Active tasks should have already been suspended and 
committed during handleRevocation, but
-                    // standbys must be suspended/committed/closed all here
+                if (task.isActive()) {
+                    // Active tasks are revoked and suspended/committed during 
#handleRevocation
+                    if (!task.state().equals(State.SUSPENDED)) {
+                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
+                                  task.id(), task.state());
+                        throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
+                    }
+                } else {
                     task.suspend();
                     task.prepareCommit();
                     task.postCommit();
@@ -268,10 +274,19 @@ public class TaskManager {
             final Task newTask;
             try {
                 if (oldTask.isActive()) {
+                    if (!oldTask.state().equals(State.SUSPENDED)) {
+                        // Active tasks are revoked and suspended/committed 
during #handleRevocation
+                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
+                                  oldTask.id(), oldTask.state());
+                        throw new IllegalStateException("Active task " + 
oldTask.id() + " should have been suspended");
+                    }
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
-                    oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already
+                    oldTask.suspend();
+                    oldTask.prepareCommit();
+                    oldTask.postCommit();
                     final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
                     newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 003682e..a4875fb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -58,6 +58,10 @@ public class OffsetCheckpoint {
 
     private static final int VERSION = 0;
 
+    // Use a negative sentinel when we don't know the offset instead of 
skipping it to distinguish it from dirty state
+    // and use -2 as the -1 sentinel may be taken by some producer errors
+    public static final long OFFSET_UNKNOWN = -2;
+
     private final File file;
     private final Object lock;
 
@@ -91,7 +95,7 @@ public class OffsetCheckpoint {
                 for (final Map.Entry<TopicPartition, Long> entry : 
offsets.entrySet()) {
                     final TopicPartition tp = entry.getKey();
                     final Long offset = entry.getValue();
-                    if (offset >= 0L) {
+                    if (isValid(offset)) {
                         writeEntry(writer, tp, offset);
                     } else {
                         LOG.error("Received offset={} to write to checkpoint 
file for {}", offset, tp);
@@ -144,7 +148,7 @@ public class OffsetCheckpoint {
                 final int version = readInt(reader);
                 switch (version) {
                     case 0:
-                        final int expectedSize = readInt(reader);
+                        int expectedSize = readInt(reader);
                         final Map<TopicPartition, Long> offsets = new 
HashMap<>();
                         String line = reader.readLine();
                         while (line != null) {
@@ -158,10 +162,11 @@ public class OffsetCheckpoint {
                             final int partition = Integer.parseInt(pieces[1]);
                             final TopicPartition tp = new 
TopicPartition(topic, partition);
                             final long offset = Long.parseLong(pieces[2]);
-                            if (offset >= 0L) {
+                            if (isValid(offset)) {
                                 offsets.put(tp, offset);
                             } else {
                                 LOG.warn("Read offset={} from checkpoint file 
for {}", offset, tp);
+                                --expectedSize;
                             }
 
                             line = reader.readLine();
@@ -204,4 +209,8 @@ public class OffsetCheckpoint {
         return file.getAbsolutePath();
     }
 
+    private boolean isValid(final long offset) {
+        return offset >= 0L || offset == OFFSET_UNKNOWN;
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index cc80d08..0a1d874 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -32,9 +32,12 @@ import org.junit.Test;
 
 import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEntry;
 import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 public class OffsetCheckpointTest {
 
@@ -82,7 +85,7 @@ public class OffsetCheckpointTest {
     }
 
     @Test
-    public void shouldSkipNegativeOffsetsDuringRead() throws IOException {
+    public void shouldSkipInvalidOffsetsDuringRead() throws IOException {
         final File file = TestUtils.tempFile();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
 
@@ -91,20 +94,38 @@ public class OffsetCheckpointTest {
             offsets.put(new TopicPartition(topic, 0), -1L);
 
             writeVersion0(offsets, file);
+            assertTrue(checkpoint.read().isEmpty());
         } finally {
             checkpoint.delete();
         }
     }
 
     @Test
-    public void shouldThrowOnNegativeOffsetInWrite() throws IOException {
+    public void shouldReadAndWriteSentinelOffset() throws IOException {
+        final File f = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+        try {
+            final Map<TopicPartition, Long> offsetsToWrite = new HashMap<>();
+            offsetsToWrite.put(new TopicPartition(topic, 1), -2L);
+            checkpoint.write(offsetsToWrite);
+
+            final Map<TopicPartition, Long> readOffsets = checkpoint.read();
+            assertThat(readOffsets.get(new TopicPartition(topic, 1)), 
equalTo(-2L));
+        } finally {
+            checkpoint.delete();
+        }
+    }
+
+    @Test
+    public void shouldThrowOnInvalidOffsetInWrite() throws IOException {
         final File f = TestUtils.tempFile();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
         try {
             final Map<TopicPartition, Long> offsets = new HashMap<>();
             offsets.put(new TopicPartition(topic, 0), 0L);
-            offsets.put(new TopicPartition(topic, 1), -1L);
+            offsets.put(new TopicPartition(topic, 1), -1L); // invalid
             offsets.put(new TopicPartition(topic, 2), 2L);
 
             assertThrows(IllegalStateException.class, () -> 
checkpoint.write(offsets));

Reply via email to