This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit aa306b8acbe41461b9748ee62275897a109989ab
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Oct 30 13:28:31 2020 -0700

    KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)
    
    Delete the existing checkpoint file if told to write empty offsets map to 
ensure that corrupted offsets are not re-initialized from
    
    Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../streams/state/internals/OffsetCheckpoint.java   |  4 +++-
 .../state/internals/OffsetCheckpointTest.java       | 21 ++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 59afbb3..3ec2386 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -78,8 +78,10 @@ public class OffsetCheckpoint {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws 
IOException {
-        // if there is no offsets, skip writing the file to save disk IOs
+        // if there are no offsets, skip writing the file to save disk IOs
+        // but make sure to delete the existing file if one exists
         if (offsets.isEmpty()) {
+            Utils.delete(file);
             return;
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index fe871e1..d9ddff1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -34,6 +34,7 @@ import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEnt
 import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -74,7 +75,7 @@ public class OffsetCheckpointTest {
         final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), 
"kafka.tmp");
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.emptyMap());
 
         assertFalse(f.exists());
 
@@ -85,6 +86,24 @@ public class OffsetCheckpointTest {
     }
 
     @Test
+    public void shouldDeleteExistingCheckpointWhenNoOffsets() throws 
IOException {
+        final File file = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
+
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(new 
TopicPartition(topic, 0), 1L);
+
+        checkpoint.write(offsets);
+
+        assertThat(file.exists(), is(true));
+        assertThat(offsets, is(checkpoint.read()));
+
+        checkpoint.write(Collections.emptyMap());
+
+        assertThat(file.exists(), is(false));
+        assertThat(Collections.<TopicPartition, Long>emptyMap(), 
is(checkpoint.read()));
+    }
+
+    @Test
     public void shouldSkipInvalidOffsetsDuringRead() throws IOException {
         final File file = TestUtils.tempFile();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);

Reply via email to