This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 650498a KAFKA-13078: Fix a bug where we were closing the
RawSnapshotWriter to early (#11040)
650498a is described below
commit 650498a8de8f8c093d165c135677720426cdbcdc
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jul 13 16:20:03 2021 -0700
KAFKA-13078: Fix a bug where we were closing the RawSnapshotWriter to early
(#11040)
Reviewers: David Arthur <[email protected]>
---
.../java/org/apache/kafka/raft/FollowerState.java | 4 ++--
.../kafka/snapshot/FileRawSnapshotWriter.java | 17 +++++++++++-----
.../kafka/snapshot/MockRawSnapshotWriter.java | 23 ++++++++++++++++------
3 files changed, 31 insertions(+), 13 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index c86c5c8..e3a3047 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -137,11 +137,11 @@ public class FollowerState implements EpochState {
return fetchingSnapshot;
}
- public void setFetchingSnapshot(Optional<RawSnapshotWriter>
fetchingSnapshot) {
+ public void setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {
if (fetchingSnapshot.isPresent()) {
fetchingSnapshot.get().close();
}
- this.fetchingSnapshot = fetchingSnapshot;
+ fetchingSnapshot = newSnapshot;
}
@Override
diff --git
a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
index f3f2222..badefd3 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
@@ -60,8 +60,12 @@ public final class FileRawSnapshotWriter implements
RawSnapshotWriter {
return channel.size();
} catch (IOException e) {
throw new UncheckedIOException(
- String.format("Error calculating snapshot size.temp path = %s,
snapshotId = %s.",
- this.tempSnapshotPath, this.snapshotId), e);
+ String.format(
+ "Error calculating snapshot size. temp path = %s,
snapshotId = %s.",
+ tempSnapshotPath,
+ snapshotId),
+ e
+ );
}
}
@@ -177,14 +181,17 @@ public final class FileRawSnapshotWriter implements
RawSnapshotWriter {
try {
return new FileRawSnapshotWriter(
path,
- FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE,
StandardOpenOption.APPEND)),
+ FileChannel.open(path, StandardOpenOption.WRITE,
StandardOpenOption.APPEND),
snapshotId,
replicatedLog
);
} catch (IOException e) {
throw new UncheckedIOException(
- String.format("Error creating snapshot writer, " +
- "temp path = %s, snapshotId %s.", path, snapshotId),
+ String.format(
+ "Error creating snapshot writer. path = %s, snapshotId
%s.",
+ path,
+ snapshotId
+ ),
e
);
}
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
index 6788692..0b5cc66 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
@@ -29,6 +29,7 @@ public final class MockRawSnapshotWriter implements
RawSnapshotWriter {
private final Consumer<ByteBuffer> frozenHandler;
private boolean frozen = false;
+ private boolean closed = false;
public MockRawSnapshotWriter(
OffsetAndEpoch snapshotId,
@@ -45,19 +46,19 @@ public final class MockRawSnapshotWriter implements
RawSnapshotWriter {
@Override
public long sizeInBytes() {
- ensureNotFrozen();
+ ensureNotFrozenOrClosed();
return data.position();
}
@Override
public void append(UnalignedMemoryRecords records) {
- ensureNotFrozen();
+ ensureNotFrozenOrClosed();
data.write(records.buffer());
}
@Override
public void append(MemoryRecords records) {
- ensureNotFrozen();
+ ensureNotFrozenOrClosed();
data.write(records.buffer());
}
@@ -68,7 +69,7 @@ public final class MockRawSnapshotWriter implements
RawSnapshotWriter {
@Override
public void freeze() {
- ensureNotFrozen();
+ ensureNotFrozenOrClosed();
frozen = true;
ByteBuffer buffer = data.buffer();
@@ -78,16 +79,26 @@ public final class MockRawSnapshotWriter implements
RawSnapshotWriter {
}
@Override
- public void close() {}
+ public void close() {
+ ensureOpen();
+ closed = true;
+ }
@Override
public String toString() {
return String.format("MockRawSnapshotWriter(snapshotId=%s, data=%s)",
snapshotId, data.buffer());
}
- private void ensureNotFrozen() {
+ private void ensureNotFrozenOrClosed() {
if (frozen) {
throw new IllegalStateException("Snapshot is already frozen " +
snapshotId);
}
+ ensureOpen();
+ }
+
+ private void ensureOpen() {
+ if (closed) {
+ throw new IllegalStateException("Snapshot is already closed " +
snapshotId);
+ }
}
}