Repository: incubator-ratis Updated Branches: refs/heads/master 30cc77670 -> 22b70e9c4
RATIS-136. Reduce log level for segment rollover. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/22b70e9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/22b70e9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/22b70e9c Branch: refs/heads/master Commit: 22b70e9c4f7d1fc3895d38cde66c5e6fa1654d7e Parents: 30cc776 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Nov 7 18:08:42 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Nov 7 18:09:20 2017 -0800 ---------------------------------------------------------------------- .../apache/ratis/server/impl/FollowerState.java | 5 +-- .../ratis/server/storage/LogOutputStream.java | 8 ++--- .../ratis/server/storage/RaftLogWorker.java | 23 ++++++++++---- .../server/storage/TestRaftLogReadWrite.java | 33 +++++++++++++------- .../server/storage/TestRaftLogSegment.java | 31 +++++++++--------- .../server/storage/TestSegmentedRaftLog.java | 12 ++++++- .../SimpleStateMachine4Testing.java | 10 +++++- 7 files changed, 82 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 0a44e2f..3fb5ecb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -40,7 +40,8 @@ class FollowerState extends Daemon { void updateLastRpcTime(boolean inLogSync) { lastRpcTime = new Timestamp(); - LOG.trace("{} update last rpc time to {}", server.getId(), lastRpcTime); + LOG.trace("{} update last rpc time to {} {}", server.getId(), + lastRpcTime, inLogSync); this.inLogSync = inLogSync; } @@ -69,7 +70,7 @@ class FollowerState extends Daemon { synchronized (server) { if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) { LOG.info("{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms", - server.getId(), lastRpcTime, electionTimeout); + server.getId(), lastRpcTime.elapsedTimeMs(), electionTimeout); // election timeout, should become a candidate server.changeToCandidate(); break; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java index db0789e..80e344c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java @@ -57,18 +57,18 @@ public class LogOutputStream implements Closeable { private final long preallocatedSize; private long preallocatedPos; - public LogOutputStream(File file, boolean append, RaftProperties properties) + public LogOutputStream(File file, boolean append, long segmentMaxSize, + long preallocatedSize, int bufferSize) throws IOException { this.file = file; this.checksum = new PureJavaCrc32C(); - this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.segmentMaxSize = segmentMaxSize; + this.preallocatedSize = preallocatedSize; RandomAccessFile rp = new RandomAccessFile(file, "rw"); fc = rp.getChannel(); fc.position(fc.size()); preallocatedPos = fc.size(); - final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); out = new BufferedWriteChannel(fc, bufferSize); if (!append) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 1858de6..e80ca02 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -71,7 +71,9 @@ class RaftLogWorker implements Runnable { private final int forceSyncNum; - private final RaftProperties properties; + private final long segmentMaxSize; + private final long preallocatedSize; + private final int bufferSize; RaftLogWorker(RaftPeerId selfId, RaftServerImpl raftServer, RaftStorage storage, RaftProperties properties) { @@ -81,7 +83,12 @@ class RaftLogWorker implements Runnable { this.raftServer = raftServer; this.storage = storage; - this.properties = properties; + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties); this.workerThread = new Thread(this, name); } @@ -92,7 +99,8 @@ class RaftLogWorker implements Runnable { flushedIndex = latestIndex; if (openSegmentFile != null) { Preconditions.assertTrue(openSegmentFile.exists()); - out = new LogOutputStream(openSegmentFile, true, properties); + out = new LogOutputStream(openSegmentFile, true, segmentMaxSize, + preallocatedSize, bufferSize); } workerThread.start(); } @@ -227,6 +235,8 @@ class RaftLogWorker implements Runnable { } void rollLogSegment(LogSegment segmentToClose) { + LOG.info("Rolling segment:{} index to:{}", name, + (segmentToClose.getEndIndex() + 1)); addIOTask(new FinalizeLogSegment(segmentToClose)); addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); } @@ -285,7 +295,7 @@ class RaftLogWorker implements Runnable { File openFile = storage.getStorageDir() .getOpenLogFile(segmentToClose.getStartIndex()); - LOG.info("{} finalizing log segment {}", name, openFile); + LOG.debug("{} finalizing log segment {}", name, openFile); Preconditions.assertTrue(openFile.exists(), () -> name + ": File " + openFile + " does not exist, segmentToClose=" + segmentToClose.toDebugString()); @@ -323,11 +333,12 @@ class RaftLogWorker implements Runnable { @Override void execute() throws IOException { File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex); - LOG.info("{} creating new log segment {}", name, openFile); + LOG.debug("{} creating new log segment {}", name, openFile); Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s", openFile, name); Preconditions.assertTrue(out == null && pendingFlushNum == 0); - out = new LogOutputStream(openFile, false, properties); + out = new LogOutputStream(openFile, false, segmentMaxSize, + preallocatedSize, bufferSize); Preconditions.assertTrue(openFile.exists(), "Failed to create file %s for %s", openFile.getAbsolutePath(), name); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java index d05ffda..cb512a5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -51,13 +51,21 @@ public class TestRaftLogReadWrite extends BaseTest { private static final long callId = 0; private File storageDir; - private RaftProperties properties; + private long segmentMaxSize; + private long preallocatedSize; + private int bufferSize; @Before public void setup() throws Exception { storageDir = getTestDir(); - properties = new RaftProperties(); + RaftProperties properties = new RaftProperties(); RaftServerConfigKeys.setStorageDir(properties, storageDir); + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); } @After @@ -105,7 +113,8 @@ public class TestRaftLogReadWrite extends BaseTest { final LogEntryProto[] entries = new LogEntryProto[100]; try (LogOutputStream out = - new LogOutputStream(openSegment, false, properties)) { + new LogOutputStream(openSegment, false, segmentMaxSize, + preallocatedSize, bufferSize)) { size += writeMessages(entries, out); } finally { storage.close(); @@ -124,7 +133,8 @@ public class TestRaftLogReadWrite extends BaseTest { File openSegment = storage.getStorageDir().getOpenLogFile(0); LogEntryProto[] entries = new LogEntryProto[200]; try (LogOutputStream out = - new LogOutputStream(openSegment, false, properties)) { + new LogOutputStream(openSegment, false, segmentMaxSize, + preallocatedSize, bufferSize)) { for (int i = 0; i < 100; i++) { SimpleOperation m = new SimpleOperation("m" + i); entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, @@ -134,7 +144,8 @@ public class TestRaftLogReadWrite extends BaseTest { } try (LogOutputStream out = - new LogOutputStream(openSegment, true, properties)) { + new LogOutputStream(openSegment, true, segmentMaxSize, + preallocatedSize, bufferSize)) { for (int i = 100; i < 200; i++) { SimpleOperation m = new SimpleOperation("m" + i); entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, @@ -161,7 +172,8 @@ public class TestRaftLogReadWrite extends BaseTest { long size = SegmentedRaftLog.HEADER_BYTES.length; LogEntryProto[] entries = new LogEntryProto[100]; - LogOutputStream out = new LogOutputStream(openSegment, false, properties); + LogOutputStream out = new LogOutputStream(openSegment, false, + segmentMaxSize, preallocatedSize, bufferSize); size += writeMessages(entries, out); out.flush(); @@ -185,14 +197,12 @@ public class TestRaftLogReadWrite extends BaseTest { */ @Test public void testReadWithCorruptPadding() throws IOException { - RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf("4MB")); - RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf("16MB")); - final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); File openSegment = storage.getStorageDir().getOpenLogFile(0); LogEntryProto[] entries = new LogEntryProto[10]; - LogOutputStream out = new LogOutputStream(openSegment, false, properties); + LogOutputStream out = new LogOutputStream(openSegment, false, + 16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize); for (int i = 0; i < 10; i++) { SimpleOperation m = new SimpleOperation("m" + i); entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, @@ -240,7 +250,8 @@ public class TestRaftLogReadWrite extends BaseTest { RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); File openSegment = storage.getStorageDir().getOpenLogFile(0); try (LogOutputStream out = - new LogOutputStream(openSegment, false, properties)) { + new LogOutputStream(openSegment, false, segmentMaxSize, + preallocatedSize, bufferSize)) { for (int i = 0; i < 100; i++) { LogEntryProto entry = ProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), 0, i, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 69d78f7..26477c2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -52,12 +52,21 @@ public class TestRaftLogSegment extends BaseTest { private static final long callId = 0; private File storageDir; - private final RaftProperties properties = new RaftProperties(); + private long segmentMaxSize; + private long preallocatedSize; + private int bufferSize; @Before public void setup() throws Exception { + RaftProperties properties = new RaftProperties(); storageDir = getTestDir(); RaftServerConfigKeys.setStorageDir(properties, storageDir); + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); } @After @@ -74,7 +83,8 @@ public class TestRaftLogSegment extends BaseTest { storage.getStorageDir().getClosedLogFile(start, start + size - 1); LogEntryProto[] entries = new LogEntryProto[size]; - try (LogOutputStream out = new LogOutputStream(file, false, properties)) { + try (LogOutputStream out = new LogOutputStream(file, false, + segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < size; i++) { SimpleOperation op = new SimpleOperation("m" + i); entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), @@ -228,13 +238,6 @@ public class TestRaftLogSegment extends BaseTest { SegmentedRaftLog.HEADER_BYTES.length, term); } - private RaftProperties getProperties(long maxSegmentSize, int preallocatedSize) { - RaftProperties p = new RaftProperties(); - RaftServerConfigKeys.Log.setSegmentSizeMax(p, SizeInBytes.valueOf(maxSegmentSize)); - RaftServerConfigKeys.Log.setPreallocatedSize(p, SizeInBytes.valueOf(preallocatedSize)); - return p; - } - @Test public void testPreallocateSegment() throws Exception { RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); @@ -249,7 +252,7 @@ public class TestRaftLogSegment extends BaseTest { for (int max : maxSizes) { for (int a : preallocated) { try (LogOutputStream ignored = - new LogOutputStream(file, false, getProperties(max, a))) { + new LogOutputStream(file, false, max, a, bufferSize)) { Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a)); } try (LogInputStream in = @@ -265,7 +268,7 @@ public class TestRaftLogSegment extends BaseTest { Arrays.fill(content, (byte) 1); final long size; try (LogOutputStream out = new LogOutputStream(file, false, - getProperties(1024, 1024))) { + 1024, 1024, bufferSize)) { SimpleOperation op = new SimpleOperation(new String(content)); LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0, clientId, callId); @@ -289,9 +292,6 @@ public class TestRaftLogSegment extends BaseTest { @Test public void testPreallocationAndAppend() throws Exception { final SizeInBytes max = SizeInBytes.valueOf(2, TraditionalBinaryPrefix.MEGA); - RaftServerConfigKeys.Log.setSegmentSizeMax(properties, max); - RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf("16KB")); - RaftServerConfigKeys.Log.setWriteBufferSize(properties, SizeInBytes.valueOf("10KB")); RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); final File file = storage.getStorageDir().getOpenLogFile(0); @@ -304,7 +304,8 @@ public class TestRaftLogSegment extends BaseTest { long totalSize = SegmentedRaftLog.HEADER_BYTES.length; long preallocated = 16 * 1024; - try (LogOutputStream out = new LogOutputStream(file, false, properties)) { + try (LogOutputStream out = new LogOutputStream(file, false, + max.getSize(), 16 * 1024, 10 * 1024)) { Assert.assertEquals(preallocated, file.length()); while (totalSize + entrySize < max.getSize()) { totalSize += entrySize; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 4c54922..09b31c1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -70,6 +70,9 @@ public class TestSegmentedRaftLog extends BaseTest { private File storageDir; private RaftProperties properties; private RaftStorage storage; + private long segmentMaxSize; + private long preallocatedSize; + private int bufferSize; @Before public void setup() throws Exception { @@ -77,6 +80,12 @@ public class TestSegmentedRaftLog extends BaseTest { properties = new RaftProperties(); RaftServerConfigKeys.setStorageDir(properties, storageDir); storage = new RaftStorage(storageDir, RaftServerConstants.StartupOption.REGULAR); + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); } @After @@ -95,7 +104,8 @@ public class TestSegmentedRaftLog extends BaseTest { final int size = (int) (range.end - range.start + 1); LogEntryProto[] entries = new LogEntryProto[size]; - try (LogOutputStream out = new LogOutputStream(file, false, properties)) { + try (LogOutputStream out = new LogOutputStream(file, false, + segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < size; i++) { SimpleOperation m = new SimpleOperation("m" + (i + range.start)); entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index a0c4f45..a6e6672 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; @@ -68,6 +69,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final TermIndexTracker termIndexTracker = new TermIndexTracker(); private final RaftProperties properties = new RaftProperties(); + private long segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + private long preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + private int bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); private volatile boolean running = true; private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; @@ -143,7 +150,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { termIndex.getIndex()); LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(), termIndex.getIndex(), snapshotFile); - try (LogOutputStream out = new LogOutputStream(snapshotFile, false, properties)) { + try (LogOutputStream out = new LogOutputStream(snapshotFile, false, + segmentMaxSize, preallocatedSize, bufferSize)) { for (final LogEntryProto entry : list) { if (entry.getIndex() > endIndex) { break;
