Repository: incubator-ratis
Updated Branches:
  refs/heads/master cce89d059 -> 079ad4876


RATIS-401. StateMachineDataPolicy.getFromFuture should throw an exception if 
all retries have failed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/079ad487
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/079ad487
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/079ad487

Branch: refs/heads/master
Commit: 079ad4876bd92e6acd43667c5ade610311a5e4c1
Parents: cce89d0
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Thu Nov 8 16:59:40 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Thu Nov 8 16:59:40 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/JavaUtils.java   |  2 +-
 .../ratis/server/storage/RaftLogWorker.java     |  6 +++
 .../server/storage/TestSegmentedRaftLog.java    | 52 +++++++++++++++++---
 3 files changed, 52 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/079ad487/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index b855b2a..95fcf35 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -139,7 +139,7 @@ public interface JavaUtils {
           throw t;
         }
         if (log != null && log.isWarnEnabled()) {
-          log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
+          log.warn("FAILED \"" + name + "\", attempt #" + i + "/" + numAttempts
               + ": " + t + ", sleep " + sleepTime + " and then retry.", t);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/079ad487/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 82b0b49..8e1c855 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -60,6 +61,7 @@ class RaftLogWorker implements Runnable {
       this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties);
       this.syncTimeout = 
RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
       this.syncTimeoutRetry = 
RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties);
+      Preconditions.assertTrue(syncTimeoutRetry >= -1);
     }
 
     boolean isSync() {
@@ -68,14 +70,18 @@ class RaftLogWorker implements Runnable {
 
     void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) 
throws IOException {
       Preconditions.assertTrue(isSync());
+      TimeoutIOException lastException = null;
       for(int retry = 0; syncTimeoutRetry == -1 || retry <= syncTimeoutRetry; 
retry++) {
         try {
           IOUtils.getFromFuture(future, getName, syncTimeout);
           return;
         } catch(TimeoutIOException e) {
           LOG.warn("Timeout " + retry + (syncTimeoutRetry == -1? "/~": "/" + 
syncTimeoutRetry), e);
+          lastException = e;
         }
       }
+      Objects.requireNonNull(lastException, "lastException == null");
+      throw lastException;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/079ad487/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 8083b62..bcbfa73 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -22,6 +22,7 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
@@ -31,11 +32,14 @@ import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -175,7 +180,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
   }
 
-  List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist,
+  static List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist,
       Supplier<String> stringSupplier) {
     List<LogEntryProto> eList = new ArrayList<>();
     for (SegmentRange range : slist) {
@@ -184,17 +189,21 @@ public class TestSegmentedRaftLog extends BaseTest {
     return eList;
   }
 
-  List<LogEntryProto> prepareLogEntries(SegmentRange range,
+  static List<LogEntryProto> prepareLogEntries(SegmentRange range,
       Supplier<String> stringSupplier, boolean hasStataMachineData, 
List<LogEntryProto> eList) {
     for(long index = range.start; index <= range.end; index++) {
-      SimpleOperation m = stringSupplier == null?
-          new SimpleOperation("m" + index, hasStataMachineData):
-          new SimpleOperation(stringSupplier.get(), hasStataMachineData);
-      eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
range.term, index));
+      eList.add(prepareLogEntry(range.term, index, stringSupplier, 
hasStataMachineData));
     }
     return eList;
   }
 
+  static LogEntryProto prepareLogEntry(long term, long index, Supplier<String> 
stringSupplier, boolean hasStataMachineData) {
+    final SimpleOperation m = stringSupplier == null?
+        new SimpleOperation("m" + index, hasStataMachineData):
+        new SimpleOperation(stringSupplier.get(), hasStataMachineData);
+    return ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), term, 
index);
+  }
+
   /**
    * Append entry one by one and check if log state is correct.
    */
@@ -447,6 +456,35 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
   }
 
+  @Test
+  public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws 
Exception {
+    RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
+    final TimeDuration syncTimeout = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
+    RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties, 
syncTimeout);
+    final int numRetries = 2;
+    RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, 
numRetries);
+    ExitUtils.disableSystemExit();
+
+    final LogEntryProto entry = prepareLogEntry(0, 0, null, true);
+    final StateMachine sm = new BaseStateMachine() {
+      @Override
+      public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
+        return new CompletableFuture<>(); // the future never completes
+      }
+    };
+
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, 
null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.appendEntry(entry);  // RaftLogWorker should catch 
TimeoutIOException
+
+      JavaUtils.attempt(() -> {
+        final ExitUtils.ExitException exitException = 
ExitUtils.getFirstExitException();
+        Objects.requireNonNull(exitException, "exitException == null");
+        Assert.assertEquals(TimeoutIOException.class, 
exitException.getCause().getClass());
+      }, 3*numRetries, syncTimeout, "RaftLogWorker should catch 
TimeoutIOException and exit", LOG);
+    }
+  }
+
   static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {
     final Thread t = new Thread(() -> raftLog.appendEntry(entry));
     t.start();

Reply via email to