This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 4a0496b188cd96f05fbab10a4327714bb8408d17 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Thu Feb 16 16:34:18 2023 -0800 RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823) (cherry picked from commit c4f54dfe88d1b574a03688f16a592cf0d59d13a9) --- .../ratis/server/raftlog/segmented/LogSegment.java | 47 +++++++++++----------- .../raftlog/segmented/SegmentedRaftLogCache.java | 7 +++- .../segmented/SegmentedRaftLogInputStream.java | 15 ++++--- .../raftlog/segmented/SegmentedRaftLogReader.java | 27 +++++++------ .../segmented/SegmentedRaftLogTestUtils.java | 10 +++++ .../statemachine/SimpleStateMachine4Testing.java | 3 +- .../raftlog/segmented/TestCacheEviction.java | 4 +- .../server/raftlog/segmented/TestLogSegment.java | 20 ++++----- .../raftlog/segmented/TestRaftLogReadWrite.java | 7 ++-- .../segmented/TestSegmentedRaftLogCache.java | 3 +- .../java/org/apache/ratis/tools/ParseRatisLog.java | 14 +++++-- 11 files changed, 94 insertions(+), 63 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index c3c4d6e53..b8e0e72ff 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -31,6 +31,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,35 +103,31 @@ public final class LogSegment implements Comparable<Long> { } } - static LogSegment newOpenSegment(RaftStorage storage, long start, SegmentedRaftLogMetrics raftLogMetrics) { + static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize, + SegmentedRaftLogMetrics raftLogMetrics) { Preconditions.assertTrue(start >= 0); - return new LogSegment(storage, true, start, start - 1, raftLogMetrics); + return new LogSegment(storage, true, start, start - 1, maxOpSize, raftLogMetrics); } @VisibleForTesting static LogSegment newCloseSegment(RaftStorage storage, - long start, long end, SegmentedRaftLogMetrics raftLogMetrics) { + long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { Preconditions.assertTrue(start >= 0 && end >= start); - return new LogSegment(storage, false, start, end, raftLogMetrics); - } - - static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SegmentedRaftLogMetrics metrics) { - return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), metrics) - : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), metrics); + return new LogSegment(storage, false, start, end, maxOpSize, raftLogMetrics); } - public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, - CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer) - throws IOException { - return readSegmentFile(file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), - corruptionPolicy, raftLogMetrics, entryConsumer); + static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize, + SegmentedRaftLogMetrics metrics) { + return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), maxOpSize, metrics) + : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), maxOpSize, metrics); } - private static int readSegmentFile(File file, long start, long end, boolean isOpen, + public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize, CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer) throws IOException { int count = 0; - try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) { + try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream( + file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), maxOpSize, raftLogMetrics)) { for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) { if (prev != null) { Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1, @@ -146,8 +143,8 @@ public final class LogSegment implements Comparable<Long> { switch (corruptionPolicy) { case EXCEPTION: throw ioe; case WARN_AND_RETURN: - LOG.warn("Failed to read segment file {} (start={}, end={}, isOpen? {}): only {} entries read successfully", - file, start, end, isOpen, count, ioe); + LOG.warn("Failed to read segment file {} ({}): only {} entries read successfully", + file, startEnd, count, ioe); break; default: throw new IllegalStateException("Unexpected enum value: " + corruptionPolicy @@ -158,13 +155,13 @@ public final class LogSegment implements Comparable<Long> { return count; } - static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, + static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize, boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, SegmentedRaftLogMetrics raftLogMetrics) throws IOException { - final LogSegment segment = newLogSegment(storage, startEnd, raftLogMetrics); + final LogSegment segment = newLogSegment(storage, startEnd, maxOpSize, raftLogMetrics); final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy); final boolean isOpen = startEnd.isOpen(); - final int entryCount = readSegmentFile(file, startEnd, corruptionPolicy, raftLogMetrics, entry -> { + final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> { segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE); if (logConsumer != null) { logConsumer.accept(entry); @@ -235,7 +232,9 @@ public final class LogSegment implements Comparable<Long> { // note the loading should not exceed the endIndex: it is possible that // the on-disk log file should be truncated but has not been done yet. final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>(); - readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics, entry -> { + final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen); + readSegmentFile(file, startEnd, maxOpSize, + getLogCorruptionPolicy(), raftLogMetrics, entry -> { final TermIndex ti = TermIndex.valueOf(entry); putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE); if (ti.equals(key.getTermIndex())) { @@ -259,6 +258,7 @@ public final class LogSegment implements Comparable<Long> { /** Segment end index, inclusive. */ private volatile long endIndex; private RaftStorage storage; + private final SizeInBytes maxOpSize; private final LogEntryLoader cacheLoader; /** later replace it with a metric */ private final AtomicInteger loadingTimes = new AtomicInteger(); @@ -272,12 +272,13 @@ public final class LogSegment implements Comparable<Long> { */ private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>(); - private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, + private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { this.storage = storage; this.isOpen = isOpen; this.startIndex = start; this.endIndex = end; + this.maxOpSize = maxOpSize; this.cacheLoader = new LogEntryLoader(raftLogMetrics); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index b6f932d6a..e42f451d0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -32,6 +32,7 @@ import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.AutoCloseableReadWriteLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -350,6 +351,7 @@ public class SegmentedRaftLogCache { private volatile LogSegment openSegment; private final LogSegmentList closedSegments; private final RaftStorage storage; + private final SizeInBytes maxOpSize; private final SegmentedRaftLogMetrics raftLogMetrics; private final int maxCachedSegments; @@ -367,6 +369,7 @@ public class SegmentedRaftLogCache { this.raftLogMetrics.addOpenSegmentSizeInBytes(this); this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties); this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize(); + this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties); } int getMaxCachedSegments() { @@ -376,7 +379,7 @@ public class SegmentedRaftLogCache { void loadSegment(LogSegmentPath pi, boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) throws IOException { final LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.getStartEnd(), - keepEntryInCache, logConsumer, raftLogMetrics); + maxOpSize, keepEntryInCache, logConsumer, raftLogMetrics); if (logSegment != null) { addSegment(logSegment); } @@ -434,7 +437,7 @@ public class SegmentedRaftLogCache { } void addOpenSegment(long startIndex) { - setOpenSegment(LogSegment.newOpenSegment(storage, startIndex,raftLogMetrics)); + setOpenSegment(LogSegment.newOpenSegment(storage, startIndex, maxOpSize, raftLogMetrics)); } private void setOpenSegment(LogSegment openSegment) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java index e445b1abb..481f837f5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java @@ -28,6 +28,7 @@ import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.OpenCloseState; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,14 +67,12 @@ public class SegmentedRaftLogInputStream implements Closeable { private final boolean isOpen; private final OpenCloseState state; private SegmentedRaftLogReader reader; + private final SizeInBytes maxOpSize; private final SegmentedRaftLogMetrics raftLogMetrics; - public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen) { - this(log, startIndex, endIndex, isOpen, null); - } - SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen, - SegmentedRaftLogMetrics raftLogMetrics) { + SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { + this.maxOpSize = maxOpSize; if (isOpen) { Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX); } else { @@ -92,7 +91,7 @@ public class SegmentedRaftLogInputStream implements Closeable { state.open(); boolean initSuccess = false; try { - reader = new SegmentedRaftLogReader(logFile, raftLogMetrics); + reader = new SegmentedRaftLogReader(logFile, maxOpSize, raftLogMetrics); initSuccess = reader.verifyHeader(); } finally { if (!initSuccess) { @@ -191,11 +190,11 @@ public class SegmentedRaftLogInputStream implements Closeable { * @return Result of the validation * @throws IOException */ - static LogValidation scanEditLog(File file, long maxTxIdToScan) + static LogValidation scanEditLog(File file, long maxTxIdToScan, SizeInBytes maxOpSize) throws IOException { SegmentedRaftLogInputStream in; try { - in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, null); + in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, maxOpSize, null); // read the header, initialize the inputstream in.init(); } catch (EOFException e) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java index dc67d31c4..f1179de84 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java @@ -27,6 +27,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.PureJavaCrc32C; +import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,21 +134,22 @@ class SegmentedRaftLogReader implements Closeable { } } - private static final int MAX_OP_SIZE = 32 * 1024 * 1024; - private final File file; private final LimitedInputStream limiter; private final DataInputStream in; private byte[] temp = new byte[4096]; private final Checksum checksum; private final SegmentedRaftLogMetrics raftLogMetrics; + private final SizeInBytes maxOpSize; - SegmentedRaftLogReader(File file, SegmentedRaftLogMetrics raftLogMetrics) throws FileNotFoundException { + SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) + throws FileNotFoundException { this.file = file; this.limiter = new LimitedInputStream( new BufferedInputStream(new FileInputStream(file))); in = new DataInputStream(limiter); checksum = new PureJavaCrc32C(); + this.maxOpSize = maxOpSize; this.raftLogMetrics = raftLogMetrics; } @@ -273,8 +275,9 @@ class SegmentedRaftLogReader implements Closeable { * @return The log entry, or null if we hit EOF. */ private LogEntryProto decodeEntry() throws IOException { - limiter.setLimit(MAX_OP_SIZE); - in.mark(MAX_OP_SIZE); + final int max = maxOpSize.getSizeInt(); + limiter.setLimit(max); + in.mark(max); byte nextByte; try { @@ -294,17 +297,17 @@ class SegmentedRaftLogReader implements Closeable { // Here, we verify that the Op size makes sense and that the // data matches its checksum before attempting to construct an Op. int entryLength = CodedInputStream.readRawVarint32(nextByte, in); - if (entryLength > MAX_OP_SIZE) { + if (entryLength > max) { throw new IOException("Entry has size " + entryLength - + ", but MAX_OP_SIZE = " + MAX_OP_SIZE); + + ", but MAX_OP_SIZE = " + maxOpSize); } final int varintLength = CodedOutputStream.computeUInt32SizeNoTag( entryLength); final int totalLength = varintLength + entryLength; - checkBufferSize(totalLength); + checkBufferSize(totalLength, max); in.reset(); - in.mark(MAX_OP_SIZE); + in.mark(max); IOUtils.readFully(in, temp, 0, totalLength); // verify checksum @@ -323,12 +326,12 @@ class SegmentedRaftLogReader implements Closeable { CodedInputStream.newInstance(temp, varintLength, entryLength)); } - private void checkBufferSize(int entryLength) { - Preconditions.assertTrue(entryLength <= MAX_OP_SIZE); + private void checkBufferSize(int entryLength, int max) { + Preconditions.assertTrue(entryLength <= max); int length = temp.length; if (length < entryLength) { while (length < entryLength) { - length = Math.min(length * 2, MAX_OP_SIZE); + length = Math.min(length * 2, max); } temp = new byte[length]; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java index 04527e728..5dfa4de10 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java @@ -19,8 +19,18 @@ package org.apache.ratis.server.raftlog.segmented; import org.apache.log4j.Level; import org.apache.ratis.util.Log4jUtils; +import org.apache.ratis.util.SizeInBytes; + +import java.io.File; public interface SegmentedRaftLogTestUtils { + SizeInBytes MAX_OP_SIZE = SizeInBytes.valueOf("32MB"); + + static SegmentedRaftLogInputStream newSegmentedRaftLogInputStream(File log, + long startIndex, long endIndex, boolean isOpen) { + return new SegmentedRaftLogInputStream(log, startIndex, endIndex, isOpen, MAX_OP_SIZE, null); + } + static void setRaftLogWorkerLogLevel(Level level) { Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, level); } diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index cf715585e..122b66e58 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -40,6 +40,7 @@ import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; +import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.JavaUtils; @@ -309,7 +310,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } else { LOG.info("Loading snapshot {}", snapshot); final long endIndex = snapshot.getIndex(); - try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream( + try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream( snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java index 87dd2ef37..d2ff12e75 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java @@ -49,6 +49,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; + public class TestCacheEviction extends BaseTest { private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault(); @@ -56,7 +58,7 @@ public class TestCacheEviction extends BaseTest { Assert.assertEquals(numSegments, cached.length); final LogSegmentList segments = new LogSegmentList(JavaUtils.getClassSimpleName(TestCacheEviction.class)); for (int i = 0; i < numSegments; i++) { - LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, null); + LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, MAX_OP_SIZE, null); if (cached[i]) { s = Mockito.spy(s); Mockito.when(s.hasCache()).thenReturn(true); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java index 6e0af2dab..ee284ae2b 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java @@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom; import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX; import static org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize; +import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; import com.codahale.metrics.Timer; @@ -170,7 +171,7 @@ public class TestLogSegment extends BaseTest { final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten); RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir); final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, - LogSegmentStartEnd.valueOf(0), loadInitial, null, null); + LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, loadInitial, null, null); final int delta = isLastEntryPartiallyWritten? 1: 0; checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0); storage.close(); @@ -180,7 +181,7 @@ public class TestLogSegment extends BaseTest { // load a closed segment (1000-1099) final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false); LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile, - LogSegmentStartEnd.valueOf(1000, 1099L), loadInitial, null, null); + LogSegmentStartEnd.valueOf(1000, 1099L), MAX_OP_SIZE, loadInitial, null, null); checkLogSegment(closedSegment, 1000, 1099, false, closedSegment.getTotalFileSize(), 1); Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes()); @@ -189,7 +190,7 @@ public class TestLogSegment extends BaseTest { @Test public void testAppendEntries() throws Exception { final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(null, start, null); + LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null); long size = SegmentedRaftLogFormat.getHeaderLength(); final long max = 8 * 1024 * 1024; checkLogSegment(segment, start, start - 1, true, size, 0); @@ -215,7 +216,7 @@ public class TestLogSegment extends BaseTest { final File openSegmentFile = prepareLog(true, 0, 100, 0, true); RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir); final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, - LogSegmentStartEnd.valueOf(0), true, null, raftLogMetrics); + LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, true, null, raftLogMetrics); checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0); storage.close(); @@ -228,7 +229,7 @@ public class TestLogSegment extends BaseTest { @Test public void testAppendWithGap() throws Exception { - LogSegment segment = LogSegment.newOpenSegment(null, 1000, null); + LogSegment segment = LogSegment.newOpenSegment(null, 1000, MAX_OP_SIZE, null); SimpleOperation op = new SimpleOperation("m"); final StateMachineLogEntryProto m = op.getLogEntryContent(); try { @@ -255,7 +256,7 @@ public class TestLogSegment extends BaseTest { public void testTruncate() throws Exception { final long term = 1; final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(null, start, null); + LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null); for (int i = 0; i < 100; i++) { LogEntryProto entry = LogProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); @@ -298,7 +299,8 @@ public class TestLogSegment extends BaseTest { new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) { Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a)); } - try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) { + try(SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream( + file, 0, INVALID_LOG_INDEX, true)) { LogEntryProto entry = in.nextEntry(); Assert.assertNull(entry); } @@ -318,8 +320,8 @@ public class TestLogSegment extends BaseTest { } Assert.assertEquals(file.length(), size + SegmentedRaftLogFormat.getHeaderLength()); - try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, - INVALID_LOG_INDEX, true)) { + try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream( + file, 0, INVALID_LOG_INDEX, true)) { LogEntryProto entry = in.nextEntry(); Assert.assertArrayEquals(content, entry.getStateMachineLogEntry().getLogData().toByteArray()); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java index 88b5e2f48..e79f9f7f9 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java @@ -75,7 +75,8 @@ public class TestRaftLogReadWrite extends BaseTest { private LogEntryProto[] readLog(File file, long startIndex, long endIndex, boolean isOpen) throws IOException { List<LogEntryProto> list = new ArrayList<>(); - try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, startIndex, endIndex, isOpen)) { + try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream( + file, startIndex, endIndex, isOpen)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { list.add(entry); @@ -207,8 +208,8 @@ public class TestRaftLogReadWrite extends BaseTest { } List<LogEntryProto> list = new ArrayList<>(); - try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(openSegment, 0, - RaftLog.INVALID_LOG_INDEX, true)) { + try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream( + openSegment, 0, RaftLog.INVALID_LOG_INDEX, true)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { list.add(entry); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index 976e9d6e4..7c5229e5d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.raftlog.segmented; import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*; +import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; import java.io.IOException; import java.util.Iterator; @@ -59,7 +60,7 @@ public class TestSegmentedRaftLogCache { } private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { - LogSegment s = LogSegment.newOpenSegment(null, start, null); + LogSegment s = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null); for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); diff --git a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java index 250a4790a..564ce0bf0 100644 --- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java +++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java @@ -24,6 +24,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.segmented.LogSegmentPath; import org.apache.ratis.server.raftlog.segmented.LogSegment; +import org.apache.ratis.util.SizeInBytes; import java.io.File; import java.io.IOException; @@ -33,15 +34,17 @@ public final class ParseRatisLog { private final File file; private final Function<StateMachineLogEntryProto, String> smLogToString; + private final SizeInBytes maxOpSize; private long numConfEntries; private long numMetadataEntries; private long numStateMachineEntries; private long numInvalidEntries; - private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString) { + private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString, SizeInBytes maxOpSize) { this.file = f; this.smLogToString = smLogToString; + this.maxOpSize = maxOpSize; this.numConfEntries = 0; this.numMetadataEntries = 0; this.numStateMachineEntries = 0; @@ -56,7 +59,7 @@ public final class ParseRatisLog { } System.out.println("Processing Raft Log file: " + file.getAbsolutePath() + " size:" + file.length()); - final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), + final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), maxOpSize, RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, this::processLogEntry); System.out.println("Num Total Entries: " + entryCount); System.out.println("Num Conf Entries: " + numConfEntries); @@ -85,7 +88,12 @@ public final class ParseRatisLog { public static class Builder { private File file = null; private Function<StateMachineLogEntryProto, String> smLogToString = null; + private SizeInBytes maxOpSize = SizeInBytes.valueOf("32MB"); + public Builder setMaxOpSize(SizeInBytes maxOpSize) { + this.maxOpSize = maxOpSize; + return this; + } public Builder setSegmentFile(File segmentFile) { this.file = segmentFile; @@ -98,7 +106,7 @@ public final class ParseRatisLog { } public ParseRatisLog build() { - return new ParseRatisLog(file, smLogToString); + return new ParseRatisLog(file, smLogToString, maxOpSize); } } }
