This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new ba0f04b KAFKA-10362: When resuming Streams active task with EOS, the
checkpoint file is deleted (#9247)
ba0f04b is described below
commit ba0f04b569cae036ba2c79c91a32af2bba5397e9
Author: Sharath Bhat <[email protected]>
AuthorDate: Wed Oct 7 23:50:06 2020 +0530
KAFKA-10362: When resuming Streams active task with EOS, the checkpoint
file is deleted (#9247)
Deleted the checkpoint file before the transition from SUSPENDED state to
RESTORING state
Reviewers: Guozhang Wang <[email protected]>
---
.../processor/internals/ProcessorStateManager.java | 6 +++++
.../streams/processor/internals/StreamTask.java | 9 +++++++
.../internals/ProcessorStateManagerTest.java | 30 ++++++++++++++++++++++
3 files changed, 45 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1de599d..deadc68 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -598,4 +598,10 @@ public class ProcessorStateManager implements StateManager
{
private Long changelogOffsetFromCheckpointedOffset(final long offset) {
return offset != OFFSET_UNKNOWN ? offset : null;
}
+
+ public void deleteCheckPointFileIfEOSEnabled() throws IOException {
+ if (eosEnabled) {
+ checkpointFile.delete();
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5ed6e52..61a94c4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -329,6 +329,15 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
case SUSPENDED:
// just transit the state without any logical changes:
suspended and restoring states
// are not actually any different for inner modules
+
+ // Deleting checkpoint file before transition to RESTORING
state (KAFKA-10362)
+ try {
+ stateMgr.deleteCheckPointFileIfEOSEnabled();
+ log.debug("Deleted check point file upon resuming with EOS
enabled");
+ } catch (final IOException ioe) {
+ log.error("Encountered error while deleting the checkpoint
file due to this exception", ioe);
+ }
+
transitionTo(State.RESTORING);
log.info("Resumed to restoring state");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 8d4e35b..a3e5a75 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -891,6 +891,36 @@ public class ProcessorStateManagerTest {
stateMgr.close();
}
+ @Test
+ public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
+ final long checkpointOffset = 10L;
+ final Map<TopicPartition, Long> offsets = mkMap(
+ mkEntry(persistentStorePartition, checkpointOffset),
+ mkEntry(nonPersistentStorePartition, checkpointOffset),
+ mkEntry(irrelevantPartition, 999L)
+ );
+ checkpoint.write(offsets);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ stateMgr.deleteCheckPointFileIfEOSEnabled();
+ stateMgr.close();
+ assertFalse(checkpointFile.exists());
+ }
+
+ @Test
+ public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws
IOException {
+ final long checkpointOffset = 10L;
+ final Map<TopicPartition, Long> offsets = mkMap(
+ mkEntry(persistentStorePartition, checkpointOffset),
+ mkEntry(nonPersistentStorePartition, checkpointOffset),
+ mkEntry(irrelevantPartition, 999L)
+ );
+ checkpoint.write(offsets);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, false);
+ stateMgr.deleteCheckPointFileIfEOSEnabled();
+ stateMgr.close();
+ assertTrue(checkpointFile.exists());
+ }
+
private ProcessorStateManager getStateManager(final Task.TaskType
taskType, final boolean eosEnabled) {
return new ProcessorStateManager(
taskId,