This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit aa306b8acbe41461b9748ee62275897a109989ab Author: A. Sophie Blee-Goldman <[email protected]> AuthorDate: Fri Oct 30 13:28:31 2020 -0700 KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534) Delete the existing checkpoint file if told to write empty offsets map to ensure that corrupted offsets are not re-initialized from Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang <[email protected]> --- .../streams/state/internals/OffsetCheckpoint.java | 4 +++- .../state/internals/OffsetCheckpointTest.java | 21 ++++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 59afbb3..3ec2386 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -78,8 +78,10 @@ public class OffsetCheckpoint { * @throws IOException if any file operation fails with an IO exception */ public void write(final Map<TopicPartition, Long> offsets) throws IOException { - // if there is no offsets, skip writing the file to save disk IOs + // if there are no offsets, skip writing the file to save disk IOs + // but make sure to delete the existing file if one exists if (offsets.isEmpty()) { + Utils.delete(file); return; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java index fe871e1..d9ddff1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java @@ -34,6 +34,7 @@ import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEnt import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -74,7 +75,7 @@ public class OffsetCheckpointTest { final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp"); final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f); - checkpoint.write(Collections.<TopicPartition, Long>emptyMap()); + checkpoint.write(Collections.emptyMap()); assertFalse(f.exists()); @@ -85,6 +86,24 @@ public class OffsetCheckpointTest { } @Test + public void shouldDeleteExistingCheckpointWhenNoOffsets() throws IOException { + final File file = TestUtils.tempFile(); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file); + + final Map<TopicPartition, Long> offsets = Collections.singletonMap(new TopicPartition(topic, 0), 1L); + + checkpoint.write(offsets); + + assertThat(file.exists(), is(true)); + assertThat(offsets, is(checkpoint.read())); + + checkpoint.write(Collections.emptyMap()); + + assertThat(file.exists(), is(false)); + assertThat(Collections.<TopicPartition, Long>emptyMap(), is(checkpoint.read())); + } + + @Test public void shouldSkipInvalidOffsetsDuringRead() throws IOException { final File file = TestUtils.tempFile(); final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
