This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new ba0f04b  KAFKA-10362: When resuming Streams active task with EOS, the 
checkpoint file is deleted (#9247)
ba0f04b is described below

commit ba0f04b569cae036ba2c79c91a32af2bba5397e9
Author: Sharath Bhat <[email protected]>
AuthorDate: Wed Oct 7 23:50:06 2020 +0530

    KAFKA-10362: When resuming Streams active task with EOS, the checkpoint 
file is deleted (#9247)
    
    Deleted the checkpoint file before the transition from SUSPENDED state to 
RESTORING state
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../processor/internals/ProcessorStateManager.java |  6 +++++
 .../streams/processor/internals/StreamTask.java    |  9 +++++++
 .../internals/ProcessorStateManagerTest.java       | 30 ++++++++++++++++++++++
 3 files changed, 45 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1de599d..deadc68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -598,4 +598,10 @@ public class ProcessorStateManager implements StateManager 
{
     private Long changelogOffsetFromCheckpointedOffset(final long offset) {
         return offset != OFFSET_UNKNOWN ? offset : null;
     }
+
+    public void deleteCheckPointFileIfEOSEnabled() throws IOException {
+        if (eosEnabled) {
+            checkpointFile.delete();
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5ed6e52..61a94c4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -329,6 +329,15 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             case SUSPENDED:
                 // just transit the state without any logical changes: 
suspended and restoring states
                 // are not actually any different for inner modules
+
+                // Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
+                try {
+                    stateMgr.deleteCheckPointFileIfEOSEnabled();
+                    log.debug("Deleted check point file upon resuming with EOS 
enabled");
+                } catch (final IOException ioe) {
+                    log.error("Encountered error while deleting the checkpoint 
file due to this exception", ioe);
+                }
+
                 transitionTo(State.RESTORING);
                 log.info("Resumed to restoring state");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 8d4e35b..a3e5a75 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -891,6 +891,36 @@ public class ProcessorStateManagerTest {
         stateMgr.close();
     }
 
+    @Test
+    public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
+        final long checkpointOffset = 10L;
+        final Map<TopicPartition, Long> offsets = mkMap(
+                mkEntry(persistentStorePartition, checkpointOffset),
+                mkEntry(nonPersistentStorePartition, checkpointOffset),
+                mkEntry(irrelevantPartition, 999L)
+        );
+        checkpoint.write(offsets);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        stateMgr.deleteCheckPointFileIfEOSEnabled();
+        stateMgr.close();
+        assertFalse(checkpointFile.exists());
+    }
+
+    @Test
+    public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws 
IOException {
+        final long checkpointOffset = 10L;
+        final Map<TopicPartition, Long> offsets = mkMap(
+                mkEntry(persistentStorePartition, checkpointOffset),
+                mkEntry(nonPersistentStorePartition, checkpointOffset),
+                mkEntry(irrelevantPartition, 999L)
+        );
+        checkpoint.write(offsets);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, false);
+        stateMgr.deleteCheckPointFileIfEOSEnabled();
+        stateMgr.close();
+        assertTrue(checkpointFile.exists());
+    }
+
     private ProcessorStateManager getStateManager(final Task.TaskType 
taskType, final boolean eosEnabled) {
         return new ProcessorStateManager(
             taskId,

Reply via email to