Repository: incubator-ratis
Updated Branches:
  refs/heads/master 16cadb68b -> 1e8b23447


RATIS-406. In RaftServerImpl, the RaftLog.append(entries) call should not hold 
the RaftServerImpl lock.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1e8b2344
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1e8b2344
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1e8b2344

Branch: refs/heads/master
Commit: 1e8b234478c61db323c7149a8b14d8435be6cd82
Parents: 16cadb6
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Wed Nov 14 17:14:51 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Wed Nov 14 17:14:51 2018 -0800

----------------------------------------------------------------------
 .../ratis/server/impl/RaftServerImpl.java       |  10 +-
 .../ratis/server/storage/MemoryRaftLog.java     |  17 +--
 .../apache/ratis/server/storage/RaftLog.java    |  68 +++++-----
 .../server/storage/RaftLogSequentialOps.java    | 125 +++++++++++++++++++
 .../ratis/server/storage/SegmentedRaftLog.java  |   9 +-
 5 files changed, 167 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 48a21ee..0c0642c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -48,8 +48,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
 import static 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
 import static 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
@@ -911,12 +909,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         return CompletableFuture.completedFuture(reply);
       }
 
-      futures = state.getLog().append(entries);
-
       state.updateConfiguration(entries);
-
-      commitInfos.forEach(commitInfoCache::update);
     }
+
+    futures = state.getLog().append(entries);
+    commitInfos.forEach(commitInfoCache::update);
+
     if (!isHeartbeat) {
       CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index 2661ba8..6443ac5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -108,7 +108,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  CompletableFuture<Long> truncate(long index) {
+  CompletableFuture<Long> truncateImpl(long index) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       Preconditions.assertTrue(index >= 0);
@@ -126,7 +126,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  CompletableFuture<Long> appendEntry(LogEntryProto entry) {
+  CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       validateLogEntry(entry);
@@ -136,23 +136,12 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public long append(long term, RaftConfiguration newConf) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      final long nextIndex = getNextIndex();
-      final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, 
nextIndex);
-      entries.add(e);
-      return nextIndex;
-    }
-  }
-
-  @Override
   public long getStartIndex() {
     return entries.size() == 0? RaftServerConstants.INVALID_LOG_INDEX: 
entries.getTermIndex(0).getIndex();
   }
 
   @Override
-  public List<CompletableFuture<Long>> append(LogEntryProto... entries) {
+  public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) {
     checkLogState();
     if (entries == null || entries.length == 0) {
       return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 00650ca..091f3e8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -49,11 +49,10 @@ import java.util.function.Consumer;
  * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
  *    in segments.
  */
-public abstract class RaftLog implements Closeable {
+public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
   public static final String LOG_SYNC = RaftLog.class.getSimpleName() + 
".logSync";
 
-
   /**
    * The largest committed index. Note the last committed log may be included
    * in the latest snapshot file.
@@ -64,6 +63,7 @@ public abstract class RaftLog implements Closeable {
   private final int maxBufferSize;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+  private final Runner runner = new Runner(this::getName);
   private final OpenCloseState state;
 
   public RaftLog(RaftPeerId selfId, int maxBufferSize) {
@@ -132,12 +132,12 @@ public abstract class RaftLog implements Closeable {
     return last.getIndex() + 1;
   }
 
-  /**
-   * Generate a log entry for the given term and message, and append the entry.
-   * Used by the leader.
-   * @return the index of the new log entry.
-   */
-  public long append(long term, TransactionContext operation) throws 
StateMachineException {
+  @Override
+  public final long append(long term, TransactionContext transaction) throws 
StateMachineException {
+    return runner.runSequentially(() -> appendImpl(term, transaction));
+  }
+
+  private long appendImpl(long term, TransactionContext operation) throws 
StateMachineException {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       final long nextIndex = getNextIndex();
@@ -164,12 +164,12 @@ public abstract class RaftLog implements Closeable {
     }
   }
 
-  /**
-   * Generate a log entry for the given term and configurations,
-   * and append the entry. Used by the leader.
-   * @return the index of the new log entry.
-   */
-  public long append(long term, RaftConfiguration newConf) {
+  @Override
+  public final long append(long term, RaftConfiguration configuration) {
+    return runner.runSequentially(() -> appendImpl(term, configuration));
+  }
+
+  private long appendImpl(long term, RaftConfiguration newConf) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       final long nextIndex = getNextIndex();
@@ -240,30 +240,26 @@ public abstract class RaftLog implements Closeable {
     }
   }
 
-  /**
-   * Truncate the log entries till the given index. The log with the given 
index
-   * will also be truncated (i.e., inclusive).
-   */
-  abstract CompletableFuture<Long> truncate(long index);
+  @Override
+  public final CompletableFuture<Long> truncate(long index) {
+    return runner.runSequentially(() -> truncateImpl(index));
+  }
 
-  /**
-   * Used by the leader when appending a new entry based on client's request
-   * or configuration change.
-   */
-  abstract CompletableFuture<Long> appendEntry(LogEntryProto entry);
+  abstract CompletableFuture<Long> truncateImpl(long index);
 
-  /**
-   * Append all the given log entries. Used by the followers.
-   *
-   * If an existing entry conflicts with a new one (same index but different
-   * terms), delete the existing entry and all entries that follow it (§5.3).
-   *
-   * This method, {@link #append(long, TransactionContext)},
-   * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
-   * do not guarantee the changes are persisted.
-   * Need to wait for the returned futures to persist the changes.
-   */
-  public abstract List<CompletableFuture<Long>> append(LogEntryProto... 
entries);
+  @Override
+  public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
+    return runner.runSequentially(() -> appendEntryImpl(entry));
+  }
+
+  abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry);
+
+  @Override
+  public final List<CompletableFuture<Long>> append(LogEntryProto... entries) {
+    return runner.runSequentially(() -> appendImpl(entries));
+  }
+
+  abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries);
 
   /**
    * @return the index of the latest entry that has been flushed to the local

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
new file mode 100644
index 0000000..d73d45c
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.StringUtils;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Sequential operations in {@link RaftLog}.
+ *
+ * All methods in this class MUST be invoked by a single thread at any time.
+ * The threads can be different in different time.
+ * The same thread may invoke any of the methods again and again.
+ * In other words, two or more threads invoking these methods (the same method 
or different methods)
+ * at the same time is not allowed since the sequence of invocations cannot be 
guaranteed.
+ *
+ * All methods in this class are asynchronous in the sense that the underlying 
I/O operations are asynchronous.
+ */
+interface RaftLogSequentialOps {
+  class Runner {
+    private final Object name;
+    private final AtomicReference<Thread> runner = new AtomicReference<>();
+
+    Runner(Supplier<String> name) {
+      this.name = StringUtils.stringSupplierAsObject(name);
+    }
+
+    /**
+     * Run the given operation sequentially.
+     * This method can be invoked by different threads but only one thread at 
any given time is allowed.
+     * The same thread can call this methed multiple times.
+     *
+     * @throws IllegalStateException if this runner is already running another 
operation.
+     */
+    <OUTPUT, THROWABLE extends Throwable> OUTPUT runSequentially(
+        CheckedSupplier<OUTPUT, THROWABLE> operation) throws THROWABLE {
+      final Thread current = Thread.currentThread();
+      // update only if the runner is null
+      final Thread previous = runner.getAndUpdate(prev -> prev != null? prev: 
current);
+      if (previous == null) {
+        // The current thread becomes the runner.
+        try {
+          return operation.get();
+        } finally {
+          // prev is expected to be current
+          final Thread got = runner.getAndUpdate(prev -> prev != current? 
prev: null);
+          Preconditions.assertTrue(got == current,
+              () -> name + ": Unexpected runner " + got + " != " + current);
+        }
+      } else if (previous == current) {
+        // The current thread is already the runner.
+        return operation.get();
+      } else {
+        throw new IllegalStateException(
+            name + ": Already running a method by " + previous + ", current=" 
+ current);
+      }
+    }
+  }
+
+  /**
+   * Append asynchronously a log entry for the given term and transaction.
+   * Used by the leader.
+   *
+   * Note that the underlying I/O operation is submitted but may not be 
completed when this method returns.
+   *
+   * @return the index of the new log entry.
+   */
+  long append(long term, TransactionContext transaction) throws 
StateMachineException;
+
+  /**
+   * Append asynchronously a log entry for the given term and configuration
+   * Used by the leader.
+   *
+   * Note that the underlying I/O operation is submitted but may not be 
completed when this method returns.
+   *
+   * @return the index of the new log entry.
+   */
+  long append(long term, RaftConfiguration configuration);
+
+  /**
+   * Append asynchronously an entry.
+   * Used by the leader and the followers.
+   */
+  CompletableFuture<Long> appendEntry(LogEntryProto entry);
+
+  /**
+   * Append asynchronously all the given log entries.
+   * Used by the followers.
+   *
+   * If an existing entry conflicts with a new one (same index but different 
terms),
+   * delete the existing entry and all entries that follow it (§5.3).
+   */
+  List<CompletableFuture<Long>> append(LogEntryProto... entries);
+
+  /**
+   * Truncate asynchronously the log entries till the given index 
(inclusively).
+   * Used by the leader and the followers.
+   */
+  CompletableFuture<Long> truncate(long index);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 666b9c8..3aee7e1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -34,7 +34,6 @@ import org.apache.ratis.util.Preconditions;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -253,7 +252,7 @@ public class SegmentedRaftLog extends RaftLog {
    * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
    */
   @Override
-  CompletableFuture<Long> truncate(long index) {
+  CompletableFuture<Long> truncateImpl(long index) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       RaftLogCache.TruncationSegments ts = cache.truncate(index);
@@ -266,8 +265,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  CompletableFuture<Long> appendEntry(LogEntryProto entry) {
-
+  CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
     checkLogState();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", server.getId(),
@@ -322,8 +320,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public List<CompletableFuture<Long>> append(LogEntryProto... entries) {
-
+  public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) {
     checkLogState();
     if (entries == null || entries.length == 0) {
       return Collections.emptyList();

Reply via email to