This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7c7a3 RATIS-573. Handle Raft Log Append Failure. Contributed by
Supratim Deka
9d7c7a3 is described below
commit 9d7c7a38a97b4a93704259d4e837cf6fed74c856
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 24 13:19:25 2019 +0800
RATIS-573. Handle Raft Log Append Failure. Contributed by Supratim Deka
---
.../server/raftlog/segmented/SegmentedRaftLog.java | 4 +++
.../raftlog/segmented/SegmentedRaftLogWorker.java | 33 ++++++++++++++++++++--
.../apache/ratis/statemachine/StateMachine.java | 10 +++++++
.../raftlog/segmented/TestSegmentedRaftLog.java | 19 +++++++++++--
4 files changed, 60 insertions(+), 6 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index a602bdc..7ea75ce 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -83,9 +83,13 @@ public class SegmentedRaftLog extends RaftLog {
}
void done() {
+ Preconditions.assertTrue(!future.isDone());
future.complete(getEndIndex());
}
+ void failed(IOException e) {
+ this.getFuture().completeExceptionally(e);
+ }
abstract void execute() throws IOException;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index a29a727..c4f6aa9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -30,6 +30,7 @@ import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
@@ -232,20 +233,33 @@ class SegmentedRaftLogWorker implements Runnable {
@Override
public void run() {
+
+ // if and when a log task encounters an exception
+ RaftLogIOException logIOException = null;
+
while (running) {
try {
Task task = queue.poll(ONE_SECOND);
if (task != null) {
try {
- task.execute();
+ if (logIOException != null) {
+ throw logIOException;
+ } else {
+ task.execute();
+ }
} catch (IOException e) {
if (task.getEndIndex() < lastWrittenIndex) {
LOG.info("Ignore IOException when handling task " + task
+ " which is smaller than the lastWrittenIndex."
+ " There should be a snapshot installed.", e);
} else {
- task.getFuture().completeExceptionally(e);
- throw e;
+ task.failed(e);
+ if (logIOException == null) {
+ logIOException = new RaftLogIOException("Log already failed"
+ + " at index " + task.getEndIndex()
+ + " for task " + task, e);
+ }
+ continue;
}
}
task.done();
@@ -391,6 +405,12 @@ class SegmentedRaftLogWorker implements Runnable {
}
@Override
+ void failed(IOException e) {
+ stateMachine.notifyLogFailed(e, entry);
+ super.failed(e);
+ }
+
+ @Override
int getSerializedSize() {
return ServerProtoUtils.getSerializedSize(entry);
}
@@ -461,6 +481,13 @@ class SegmentedRaftLogWorker implements Runnable {
}
@Override
+ void failed(IOException e) {
+ // not failed for a specific log entry, but an entire segment
+ stateMachine.notifyLogFailed(e, null);
+ super.failed(e);
+ }
+
+ @Override
long getEndIndex() {
return endIndex;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 19bd9e6..5ce8e96 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -221,6 +221,16 @@ public interface StateMachine extends Closeable {
}
/**
+ * Notify the state machine that the pipeline has failed.
+ * This notification is triggered when a log operation throws an Exception.
+ * @param t Exception which was caught, indicates possible cause.
+ * @param failedEntry if append failed for a specific entry, null otherwise.
+ */
+ default void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
+
+ }
+
+ /**
* Notify the Leader's state machine that a leader has not been elected for
a long time
* this notification is based on "raft.server.leader.election.timeout"
*
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index dcbe013..cda1043 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -36,6 +36,7 @@ import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
@@ -62,8 +63,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestSegmentedRaftLog extends BaseTest {
@@ -523,8 +522,22 @@ public class TestSegmentedRaftLog extends BaseTest {
final StateMachine sm = new BaseStateMachine() {
@Override
public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
+ lifeCycle.transition(LifeCycle.State.STARTING);
+ lifeCycle.transition(LifeCycle.State.RUNNING);
+
return new CompletableFuture<>(); // the future never completes
}
+
+ @Override
+ public void notifyLogFailed(Throwable t, LogEntryProto entry) {
+ LOG.info("Test StateMachine : Ratis log failed notification received, "
+ + "as expected. Transition to PAUSED state.");
+
+ Assert.assertNotNull(entry);
+
+ lifeCycle.transition(LifeCycle.State.PAUSING);
+ lifeCycle.transition(LifeCycle.State.PAUSED);
+ }
};
RaftServerImpl server = mock(RaftServerImpl.class);
@@ -542,7 +555,7 @@ public class TestSegmentedRaftLog extends BaseTest {
}
}
Assert.assertNotNull(ex);
- verify(server, times(1)).shutdown(false);
+ Assert.assertSame(LifeCycle.State.PAUSED, sm.getLifeCycleState());
throw ex;
}