This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push: new 688bbaee7 RATIS-2271 Leadership Loss Causes ClosedByInterruptException and NullPointerException in LogAppender Thread (#1245) 688bbaee7 is described below commit 688bbaee7b7b388c0e1eaf49b210a6a3877e4bd4 Author: GewuNewOne <89496957+rkg...@users.noreply.github.com> AuthorDate: Sat Apr 5 21:26:18 2025 +0800 RATIS-2271 Leadership Loss Causes ClosedByInterruptException and NullPointerException in LogAppender Thread (#1245) --- .../org/apache/ratis/server/raftlog/segmented/LogSegment.java | 10 ++++++++-- .../server/raftlog/segmented/SegmentedRaftLogInputStream.java | 7 ++++++- .../ratis/server/raftlog/segmented/SegmentedRaftLogReader.java | 10 +++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 2549e13e0..712b0f973 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -292,7 +292,11 @@ public final class LogSegment { } }); loadingTimes.incrementAndGet(); - return Objects.requireNonNull(toReturn.get(), () -> "toReturn == null for " + key); + final ReferenceCountedObject<LogEntryProto> proto = toReturn.get(); + if (proto == null) { + throw new RaftLogIOException("Failed to load log entry " + key); + } + return proto; } } @@ -502,8 +506,10 @@ public final class LogSegment { } try { return cacheLoader.load(ti); + } catch (RaftLogIOException e) { + throw e; } catch (Exception e) { - throw new RaftLogIOException(e); + throw new RaftLogIOException("Failed to loadCache for log entry " + ti, e); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java index 050c472dd..3cc8767fa 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.Optional; import org.apache.ratis.proto.RaftProtos.LogEntryProto; @@ -104,7 +105,11 @@ public class SegmentedRaftLogInputStream implements Closeable { try { init(); } catch (Exception e) { - LOG.error("caught exception initializing " + this, e); + if (e.getCause() instanceof ClosedByInterruptException) { + LOG.warn("Initialization is interrupted: {}", this, e); + } else { + LOG.error("Failed to initialize {}", this, e); + } throw IOUtils.asIOException(e); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java index 7d03105b9..57baffb2f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java @@ -42,6 +42,7 @@ import java.io.File; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.ClosedByInterruptException; import java.util.Optional; import java.util.zip.Checksum; @@ -169,7 +170,14 @@ class SegmentedRaftLogReader implements Closeable { */ boolean verifyHeader() throws IOException { final int headerLength = SegmentedRaftLogFormat.getHeaderLength(); - final int readLength = in.read(temp, 0, headerLength); + final int readLength; + try{ + readLength = in.read(temp, 0, headerLength); + } catch (ClosedByInterruptException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while reading the header of " + file, e); + } + Preconditions.assertTrue(readLength <= headerLength); final int matchLength = SegmentedRaftLogFormat.matchHeader(temp, 0, readLength); Preconditions.assertTrue(matchLength <= readLength);