This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 650498a  KAFKA-13078: Fix a bug where we were closing the 
RawSnapshotWriter to early (#11040)
650498a is described below

commit 650498a8de8f8c093d165c135677720426cdbcdc
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jul 13 16:20:03 2021 -0700

    KAFKA-13078: Fix a bug where we were closing the RawSnapshotWriter to early 
(#11040)
    
    Reviewers: David Arthur <[email protected]>
---
 .../java/org/apache/kafka/raft/FollowerState.java  |  4 ++--
 .../kafka/snapshot/FileRawSnapshotWriter.java      | 17 +++++++++++-----
 .../kafka/snapshot/MockRawSnapshotWriter.java      | 23 ++++++++++++++++------
 3 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java 
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index c86c5c8..e3a3047 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -137,11 +137,11 @@ public class FollowerState implements EpochState {
         return fetchingSnapshot;
     }
 
-    public void setFetchingSnapshot(Optional<RawSnapshotWriter> 
fetchingSnapshot) {
+    public void setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {
         if (fetchingSnapshot.isPresent()) {
             fetchingSnapshot.get().close();
         }
-        this.fetchingSnapshot = fetchingSnapshot;
+        fetchingSnapshot = newSnapshot;
     }
 
     @Override
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
index f3f2222..badefd3 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
@@ -60,8 +60,12 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
             return channel.size();
         } catch (IOException e) {
             throw new UncheckedIOException(
-                String.format("Error calculating snapshot size.temp path = %s, 
snapshotId = %s.",
-                    this.tempSnapshotPath, this.snapshotId), e);
+                String.format(
+                    "Error calculating snapshot size. temp path = %s, 
snapshotId = %s.",
+                    tempSnapshotPath,
+                    snapshotId),
+                e
+            );
         }
     }
 
@@ -177,14 +181,17 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
         try {
             return new FileRawSnapshotWriter(
                 path,
-                FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, 
StandardOpenOption.APPEND)),
+                FileChannel.open(path, StandardOpenOption.WRITE, 
StandardOpenOption.APPEND),
                 snapshotId,
                 replicatedLog
             );
         } catch (IOException e) {
             throw new UncheckedIOException(
-                String.format("Error creating snapshot writer, " +
-                    "temp path = %s, snapshotId %s.", path, snapshotId),
+                String.format(
+                    "Error creating snapshot writer. path = %s, snapshotId 
%s.",
+                    path,
+                    snapshotId
+                ),
                 e
             );
         }
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java 
b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
index 6788692..0b5cc66 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
@@ -29,6 +29,7 @@ public final class MockRawSnapshotWriter implements 
RawSnapshotWriter {
     private final Consumer<ByteBuffer> frozenHandler;
 
     private boolean frozen = false;
+    private boolean closed = false;
 
     public MockRawSnapshotWriter(
         OffsetAndEpoch snapshotId,
@@ -45,19 +46,19 @@ public final class MockRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public long sizeInBytes() {
-        ensureNotFrozen();
+        ensureNotFrozenOrClosed();
         return data.position();
     }
 
     @Override
     public void append(UnalignedMemoryRecords records) {
-        ensureNotFrozen();
+        ensureNotFrozenOrClosed();
         data.write(records.buffer());
     }
 
     @Override
     public void append(MemoryRecords records) {
-        ensureNotFrozen();
+        ensureNotFrozenOrClosed();
         data.write(records.buffer());
     }
 
@@ -68,7 +69,7 @@ public final class MockRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public void freeze() {
-        ensureNotFrozen();
+        ensureNotFrozenOrClosed();
 
         frozen = true;
         ByteBuffer buffer = data.buffer();
@@ -78,16 +79,26 @@ public final class MockRawSnapshotWriter implements 
RawSnapshotWriter {
     }
 
     @Override
-    public void close() {}
+    public void close() {
+        ensureOpen();
+        closed = true;
+    }
 
     @Override
     public String toString() {
         return String.format("MockRawSnapshotWriter(snapshotId=%s, data=%s)", 
snapshotId, data.buffer());
     }
 
-    private void ensureNotFrozen() {
+    private void ensureNotFrozenOrClosed() {
         if (frozen) {
             throw new IllegalStateException("Snapshot is already frozen " + 
snapshotId);
         }
+        ensureOpen();
+    }
+
+    private void ensureOpen() {
+        if (closed) {
+            throw new IllegalStateException("Snapshot is already closed " + 
snapshotId);
+        }
     }
 }

Reply via email to