Repository: incubator-ratis
Updated Branches:
  refs/heads/master 30cc77670 -> 22b70e9c4


RATIS-136. Reduce log level for segment rollover.  Contributed by Mukul Kumar 
Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/22b70e9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/22b70e9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/22b70e9c

Branch: refs/heads/master
Commit: 22b70e9c4f7d1fc3895d38cde66c5e6fa1654d7e
Parents: 30cc776
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Tue Nov 7 18:08:42 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Tue Nov 7 18:09:20 2017 -0800

----------------------------------------------------------------------
 .../apache/ratis/server/impl/FollowerState.java |  5 +--
 .../ratis/server/storage/LogOutputStream.java   |  8 ++---
 .../ratis/server/storage/RaftLogWorker.java     | 23 ++++++++++----
 .../server/storage/TestRaftLogReadWrite.java    | 33 +++++++++++++-------
 .../server/storage/TestRaftLogSegment.java      | 31 +++++++++---------
 .../server/storage/TestSegmentedRaftLog.java    | 12 ++++++-
 .../SimpleStateMachine4Testing.java             | 10 +++++-
 7 files changed, 82 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 0a44e2f..3fb5ecb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -40,7 +40,8 @@ class FollowerState extends Daemon {
 
   void updateLastRpcTime(boolean inLogSync) {
     lastRpcTime = new Timestamp();
-    LOG.trace("{} update last rpc time to {}", server.getId(), lastRpcTime);
+    LOG.trace("{} update last rpc time to {} {}", server.getId(),
+        lastRpcTime, inLogSync);
     this.inLogSync = inLogSync;
   }
 
@@ -69,7 +70,7 @@ class FollowerState extends Daemon {
         synchronized (server) {
           if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
             LOG.info("{} changes to CANDIDATE, lastRpcTime:{}, 
electionTimeout:{}ms",
-                server.getId(), lastRpcTime, electionTimeout);
+                server.getId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
             // election timeout, should become a candidate
             server.changeToCandidate();
             break;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
index db0789e..80e344c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
@@ -57,18 +57,18 @@ public class LogOutputStream implements Closeable {
   private final long preallocatedSize;
   private long preallocatedPos;
 
-  public LogOutputStream(File file, boolean append, RaftProperties properties)
+  public LogOutputStream(File file, boolean append, long segmentMaxSize,
+      long preallocatedSize, int bufferSize)
       throws IOException {
     this.file = file;
     this.checksum = new PureJavaCrc32C();
-    this.segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    this.preallocatedSize = 
RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.segmentMaxSize = segmentMaxSize;
+    this.preallocatedSize = preallocatedSize;
     RandomAccessFile rp = new RandomAccessFile(file, "rw");
     fc = rp.getChannel();
     fc.position(fc.size());
     preallocatedPos = fc.size();
 
-    final int bufferSize = 
RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     out = new BufferedWriteChannel(fc, bufferSize);
 
     if (!append) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 1858de6..e80ca02 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -71,7 +71,9 @@ class RaftLogWorker implements Runnable {
 
   private final int forceSyncNum;
 
-  private final  RaftProperties properties;
+  private final long segmentMaxSize;
+  private final long preallocatedSize;
+  private final int bufferSize;
 
   RaftLogWorker(RaftPeerId selfId, RaftServerImpl raftServer, RaftStorage 
storage,
                 RaftProperties properties) {
@@ -81,7 +83,12 @@ class RaftLogWorker implements Runnable {
     this.raftServer = raftServer;
 
     this.storage = storage;
-    this.properties = properties;
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
     this.workerThread = new Thread(this, name);
   }
@@ -92,7 +99,8 @@ class RaftLogWorker implements Runnable {
     flushedIndex = latestIndex;
     if (openSegmentFile != null) {
       Preconditions.assertTrue(openSegmentFile.exists());
-      out = new LogOutputStream(openSegmentFile, true, properties);
+      out = new LogOutputStream(openSegmentFile, true, segmentMaxSize,
+          preallocatedSize, bufferSize);
     }
     workerThread.start();
   }
@@ -227,6 +235,8 @@ class RaftLogWorker implements Runnable {
   }
 
   void rollLogSegment(LogSegment segmentToClose) {
+    LOG.info("Rolling segment:{} index to:{}", name,
+        (segmentToClose.getEndIndex() + 1));
     addIOTask(new FinalizeLogSegment(segmentToClose));
     addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
   }
@@ -285,7 +295,7 @@ class RaftLogWorker implements Runnable {
 
       File openFile = storage.getStorageDir()
           .getOpenLogFile(segmentToClose.getStartIndex());
-      LOG.info("{} finalizing log segment {}", name, openFile);
+      LOG.debug("{} finalizing log segment {}", name, openFile);
       Preconditions.assertTrue(openFile.exists(),
           () -> name + ": File " + openFile + " does not exist, 
segmentToClose="
               + segmentToClose.toDebugString());
@@ -323,11 +333,12 @@ class RaftLogWorker implements Runnable {
     @Override
     void execute() throws IOException {
       File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
-      LOG.info("{} creating new log segment {}", name, openFile);
+      LOG.debug("{} creating new log segment {}", name, openFile);
       Preconditions.assertTrue(!openFile.exists(), "open file %s exists for 
%s",
           openFile, name);
       Preconditions.assertTrue(out == null && pendingFlushNum == 0);
-      out = new LogOutputStream(openFile, false, properties);
+      out = new LogOutputStream(openFile, false, segmentMaxSize,
+          preallocatedSize, bufferSize);
       Preconditions.assertTrue(openFile.exists(), "Failed to create file %s 
for %s",
           openFile.getAbsolutePath(), name);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
index d05ffda..cb512a5 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
@@ -51,13 +51,21 @@ public class TestRaftLogReadWrite extends BaseTest {
   private static final long callId = 0;
 
   private File storageDir;
-  private RaftProperties properties;
+  private long segmentMaxSize;
+  private long preallocatedSize;
+  private int bufferSize;
 
   @Before
   public void setup() throws Exception {
     storageDir = getTestDir();
-    properties = new RaftProperties();
+    RaftProperties properties = new RaftProperties();
     RaftServerConfigKeys.setStorageDir(properties, storageDir);
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
   }
 
   @After
@@ -105,7 +113,8 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[100];
     try (LogOutputStream out =
-             new LogOutputStream(openSegment, false, properties)) {
+             new LogOutputStream(openSegment, false, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
       size += writeMessages(entries, out);
     } finally {
       storage.close();
@@ -124,7 +133,8 @@ public class TestRaftLogReadWrite extends BaseTest {
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
     LogEntryProto[] entries = new LogEntryProto[200];
     try (LogOutputStream out =
-             new LogOutputStream(openSegment, false, properties)) {
+             new LogOutputStream(openSegment, false, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
       for (int i = 0; i < 100; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
@@ -134,7 +144,8 @@ public class TestRaftLogReadWrite extends BaseTest {
     }
 
     try (LogOutputStream out =
-             new LogOutputStream(openSegment, true, properties)) {
+             new LogOutputStream(openSegment, true, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
       for (int i = 100; i < 200; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
@@ -161,7 +172,8 @@ public class TestRaftLogReadWrite extends BaseTest {
     long size = SegmentedRaftLog.HEADER_BYTES.length;
 
     LogEntryProto[] entries = new LogEntryProto[100];
-    LogOutputStream out = new LogOutputStream(openSegment, false, properties);
+    LogOutputStream out = new LogOutputStream(openSegment, false,
+        segmentMaxSize, preallocatedSize, bufferSize);
     size += writeMessages(entries, out);
     out.flush();
 
@@ -185,14 +197,12 @@ public class TestRaftLogReadWrite extends BaseTest {
    */
   @Test
   public void testReadWithCorruptPadding() throws IOException {
-    RaftServerConfigKeys.Log.setPreallocatedSize(properties, 
SizeInBytes.valueOf("4MB"));
-    RaftServerConfigKeys.Log.setSegmentSizeMax(properties, 
SizeInBytes.valueOf("16MB"));
-
     final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
 
     LogEntryProto[] entries = new LogEntryProto[10];
-    LogOutputStream out = new LogOutputStream(openSegment, false, properties);
+    LogOutputStream out = new LogOutputStream(openSegment, false,
+        16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize);
     for (int i = 0; i < 10; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
@@ -240,7 +250,8 @@ public class TestRaftLogReadWrite extends BaseTest {
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
     try (LogOutputStream out =
-             new LogOutputStream(openSegment, false, properties)) {
+             new LogOutputStream(openSegment, false, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
       for (int i = 0; i < 100; i++) {
         LogEntryProto entry = ProtoUtils.toLogEntryProto(
             new SimpleOperation("m" + i).getLogEntryContent(), 0, i,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index 69d78f7..26477c2 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -52,12 +52,21 @@ public class TestRaftLogSegment extends BaseTest {
   private static final long callId = 0;
 
   private File storageDir;
-  private final RaftProperties properties = new RaftProperties();
+  private long segmentMaxSize;
+  private long preallocatedSize;
+  private int bufferSize;
 
   @Before
   public void setup() throws Exception {
+    RaftProperties properties = new RaftProperties();
     storageDir = getTestDir();
     RaftServerConfigKeys.setStorageDir(properties, storageDir);
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
   }
 
   @After
@@ -74,7 +83,8 @@ public class TestRaftLogSegment extends BaseTest {
         storage.getStorageDir().getClosedLogFile(start, start + size - 1);
 
     LogEntryProto[] entries = new LogEntryProto[size];
-    try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
+    try (LogOutputStream out = new LogOutputStream(file, false,
+        segmentMaxSize, preallocatedSize, bufferSize)) {
       for (int i = 0; i < size; i++) {
         SimpleOperation op = new SimpleOperation("m" + i);
         entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
@@ -228,13 +238,6 @@ public class TestRaftLogSegment extends BaseTest {
         SegmentedRaftLog.HEADER_BYTES.length, term);
   }
 
-  private RaftProperties getProperties(long maxSegmentSize, int 
preallocatedSize) {
-    RaftProperties p = new RaftProperties();
-    RaftServerConfigKeys.Log.setSegmentSizeMax(p, 
SizeInBytes.valueOf(maxSegmentSize));
-    RaftServerConfigKeys.Log.setPreallocatedSize(p, 
SizeInBytes.valueOf(preallocatedSize));
-    return p;
-  }
-
   @Test
   public void testPreallocateSegment() throws Exception {
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
@@ -249,7 +252,7 @@ public class TestRaftLogSegment extends BaseTest {
     for (int max : maxSizes) {
       for (int a : preallocated) {
         try (LogOutputStream ignored =
-                 new LogOutputStream(file, false, getProperties(max, a))) {
+                 new LogOutputStream(file, false, max, a, bufferSize)) {
           Assert.assertEquals("max=" + max + ", a=" + a, file.length(), 
Math.min(max, a));
         }
         try (LogInputStream in =
@@ -265,7 +268,7 @@ public class TestRaftLogSegment extends BaseTest {
     Arrays.fill(content, (byte) 1);
     final long size;
     try (LogOutputStream out = new LogOutputStream(file, false,
-        getProperties(1024, 1024))) {
+        1024, 1024, bufferSize)) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
           0, 0, clientId, callId);
@@ -289,9 +292,6 @@ public class TestRaftLogSegment extends BaseTest {
   @Test
   public void testPreallocationAndAppend() throws Exception {
     final SizeInBytes max = SizeInBytes.valueOf(2, 
TraditionalBinaryPrefix.MEGA);
-    RaftServerConfigKeys.Log.setSegmentSizeMax(properties, max);
-    RaftServerConfigKeys.Log.setPreallocatedSize(properties, 
SizeInBytes.valueOf("16KB"));
-    RaftServerConfigKeys.Log.setWriteBufferSize(properties, 
SizeInBytes.valueOf("10KB"));
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
     final File file = storage.getStorageDir().getOpenLogFile(0);
 
@@ -304,7 +304,8 @@ public class TestRaftLogSegment extends BaseTest {
 
     long totalSize = SegmentedRaftLog.HEADER_BYTES.length;
     long preallocated = 16 * 1024;
-    try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
+    try (LogOutputStream out = new LogOutputStream(file, false,
+        max.getSize(), 16 * 1024, 10 * 1024)) {
       Assert.assertEquals(preallocated, file.length());
       while (totalSize + entrySize < max.getSize()) {
         totalSize += entrySize;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 4c54922..09b31c1 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -70,6 +70,9 @@ public class TestSegmentedRaftLog extends BaseTest {
   private File storageDir;
   private RaftProperties properties;
   private RaftStorage storage;
+  private long segmentMaxSize;
+  private long preallocatedSize;
+  private int bufferSize;
 
   @Before
   public void setup() throws Exception {
@@ -77,6 +80,12 @@ public class TestSegmentedRaftLog extends BaseTest {
     properties = new RaftProperties();
     RaftServerConfigKeys.setStorageDir(properties, storageDir);
     storage = new RaftStorage(storageDir, 
RaftServerConstants.StartupOption.REGULAR);
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
   }
 
   @After
@@ -95,7 +104,8 @@ public class TestSegmentedRaftLog extends BaseTest {
 
       final int size = (int) (range.end - range.start + 1);
       LogEntryProto[] entries = new LogEntryProto[size];
-      try (LogOutputStream out = new LogOutputStream(file, false, properties)) 
{
+      try (LogOutputStream out = new LogOutputStream(file, false,
+          segmentMaxSize, preallocatedSize, bufferSize)) {
         for (int i = 0; i < size; i++) {
           SimpleOperation m = new SimpleOperation("m" + (i + range.start));
           entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22b70e9c/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index a0c4f45..a6e6672 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -68,6 +69,12 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
   private final TermIndexTracker termIndexTracker = new TermIndexTracker();
   private final RaftProperties properties = new RaftProperties();
+  private long segmentMaxSize =
+      RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+  private long preallocatedSize =
+      RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+  private int bufferSize =
+      RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
 
   private volatile boolean running = true;
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
@@ -143,7 +150,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
         termIndex.getIndex());
     LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", 
termIndex.getTerm(),
         termIndex.getIndex(), snapshotFile);
-    try (LogOutputStream out = new LogOutputStream(snapshotFile, false, 
properties)) {
+    try (LogOutputStream out = new LogOutputStream(snapshotFile, false,
+        segmentMaxSize, preallocatedSize, bufferSize)) {
       for (final LogEntryProto entry : list) {
         if (entry.getIndex() > endIndex) {
           break;

Reply via email to