Repository: incubator-ratis Updated Branches: refs/heads/master 16cadb68b -> 1e8b23447
RATIS-406. In RaftServerImpl, the RaftLog.append(entries) call should not hold the RaftServerImpl lock. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1e8b2344 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1e8b2344 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1e8b2344 Branch: refs/heads/master Commit: 1e8b234478c61db323c7149a8b14d8435be6cd82 Parents: 16cadb6 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Wed Nov 14 17:14:51 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Wed Nov 14 17:14:51 2018 -0800 ---------------------------------------------------------------------- .../ratis/server/impl/RaftServerImpl.java | 10 +- .../ratis/server/storage/MemoryRaftLog.java | 17 +-- .../apache/ratis/server/storage/RaftLog.java | 68 +++++----- .../server/storage/RaftLogSequentialOps.java | 125 +++++++++++++++++++ .../ratis/server/storage/SegmentedRaftLog.java | 9 +- 5 files changed, 167 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 48a21ee..0c0642c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -48,8 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; @@ -911,12 +909,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return CompletableFuture.completedFuture(reply); } - futures = state.getLog().append(entries); - state.updateConfiguration(entries); - - commitInfos.forEach(commitInfoCache::update); } + + futures = state.getLog().append(entries); + commitInfos.forEach(commitInfoCache::update); + if (!isHeartbeat) { CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index 2661ba8..6443ac5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -108,7 +108,7 @@ public class MemoryRaftLog extends RaftLog { } @Override - CompletableFuture<Long> truncate(long index) { + CompletableFuture<Long> truncateImpl(long index) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { Preconditions.assertTrue(index >= 0); @@ -126,7 +126,7 @@ public class MemoryRaftLog extends RaftLog { } @Override - CompletableFuture<Long> appendEntry(LogEntryProto entry) { + CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { validateLogEntry(entry); @@ -136,23 +136,12 @@ public class MemoryRaftLog extends RaftLog { } @Override - public long append(long term, RaftConfiguration newConf) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - final long nextIndex = getNextIndex(); - final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, nextIndex); - entries.add(e); - return nextIndex; - } - } - - @Override public long getStartIndex() { return entries.size() == 0? RaftServerConstants.INVALID_LOG_INDEX: entries.getTermIndex(0).getIndex(); } @Override - public List<CompletableFuture<Long>> append(LogEntryProto... entries) { + public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) { checkLogState(); if (entries == null || entries.length == 0) { return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 00650ca..091f3e8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -49,11 +49,10 @@ import java.util.function.Consumer; * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored * in segments. */ -public abstract class RaftLog implements Closeable { +public abstract class RaftLog implements RaftLogSequentialOps, Closeable { public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class); public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync"; - /** * The largest committed index. Note the last committed log may be included * in the latest snapshot file. @@ -64,6 +63,7 @@ public abstract class RaftLog implements Closeable { private final int maxBufferSize; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final Runner runner = new Runner(this::getName); private final OpenCloseState state; public RaftLog(RaftPeerId selfId, int maxBufferSize) { @@ -132,12 +132,12 @@ public abstract class RaftLog implements Closeable { return last.getIndex() + 1; } - /** - * Generate a log entry for the given term and message, and append the entry. - * Used by the leader. - * @return the index of the new log entry. - */ - public long append(long term, TransactionContext operation) throws StateMachineException { + @Override + public final long append(long term, TransactionContext transaction) throws StateMachineException { + return runner.runSequentially(() -> appendImpl(term, transaction)); + } + + private long appendImpl(long term, TransactionContext operation) throws StateMachineException { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); @@ -164,12 +164,12 @@ public abstract class RaftLog implements Closeable { } } - /** - * Generate a log entry for the given term and configurations, - * and append the entry. Used by the leader. - * @return the index of the new log entry. - */ - public long append(long term, RaftConfiguration newConf) { + @Override + public final long append(long term, RaftConfiguration configuration) { + return runner.runSequentially(() -> appendImpl(term, configuration)); + } + + private long appendImpl(long term, RaftConfiguration newConf) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); @@ -240,30 +240,26 @@ public abstract class RaftLog implements Closeable { } } - /** - * Truncate the log entries till the given index. The log with the given index - * will also be truncated (i.e., inclusive). - */ - abstract CompletableFuture<Long> truncate(long index); + @Override + public final CompletableFuture<Long> truncate(long index) { + return runner.runSequentially(() -> truncateImpl(index)); + } - /** - * Used by the leader when appending a new entry based on client's request - * or configuration change. - */ - abstract CompletableFuture<Long> appendEntry(LogEntryProto entry); + abstract CompletableFuture<Long> truncateImpl(long index); - /** - * Append all the given log entries. Used by the followers. - * - * If an existing entry conflicts with a new one (same index but different - * terms), delete the existing entry and all entries that follow it (§5.3). - * - * This method, {@link #append(long, TransactionContext)}, - * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)}, - * do not guarantee the changes are persisted. - * Need to wait for the returned futures to persist the changes. - */ - public abstract List<CompletableFuture<Long>> append(LogEntryProto... entries); + @Override + public final CompletableFuture<Long> appendEntry(LogEntryProto entry) { + return runner.runSequentially(() -> appendEntryImpl(entry)); + } + + abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry); + + @Override + public final List<CompletableFuture<Long>> append(LogEntryProto... entries) { + return runner.runSequentially(() -> appendImpl(entries)); + } + + abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries); /** * @return the index of the latest entry that has been flushed to the local http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java new file mode 100644 index 0000000..d73d45c --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogSequentialOps.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.StateMachineException; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.CheckedSupplier; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.StringUtils; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Sequential operations in {@link RaftLog}. + * + * All methods in this class MUST be invoked by a single thread at any time. + * The threads can be different in different time. + * The same thread may invoke any of the methods again and again. + * In other words, two or more threads invoking these methods (the same method or different methods) + * at the same time is not allowed since the sequence of invocations cannot be guaranteed. + * + * All methods in this class are asynchronous in the sense that the underlying I/O operations are asynchronous. + */ +interface RaftLogSequentialOps { + class Runner { + private final Object name; + private final AtomicReference<Thread> runner = new AtomicReference<>(); + + Runner(Supplier<String> name) { + this.name = StringUtils.stringSupplierAsObject(name); + } + + /** + * Run the given operation sequentially. + * This method can be invoked by different threads but only one thread at any given time is allowed. + * The same thread can call this methed multiple times. + * + * @throws IllegalStateException if this runner is already running another operation. + */ + <OUTPUT, THROWABLE extends Throwable> OUTPUT runSequentially( + CheckedSupplier<OUTPUT, THROWABLE> operation) throws THROWABLE { + final Thread current = Thread.currentThread(); + // update only if the runner is null + final Thread previous = runner.getAndUpdate(prev -> prev != null? prev: current); + if (previous == null) { + // The current thread becomes the runner. + try { + return operation.get(); + } finally { + // prev is expected to be current + final Thread got = runner.getAndUpdate(prev -> prev != current? prev: null); + Preconditions.assertTrue(got == current, + () -> name + ": Unexpected runner " + got + " != " + current); + } + } else if (previous == current) { + // The current thread is already the runner. + return operation.get(); + } else { + throw new IllegalStateException( + name + ": Already running a method by " + previous + ", current=" + current); + } + } + } + + /** + * Append asynchronously a log entry for the given term and transaction. + * Used by the leader. + * + * Note that the underlying I/O operation is submitted but may not be completed when this method returns. + * + * @return the index of the new log entry. + */ + long append(long term, TransactionContext transaction) throws StateMachineException; + + /** + * Append asynchronously a log entry for the given term and configuration + * Used by the leader. + * + * Note that the underlying I/O operation is submitted but may not be completed when this method returns. + * + * @return the index of the new log entry. + */ + long append(long term, RaftConfiguration configuration); + + /** + * Append asynchronously an entry. + * Used by the leader and the followers. + */ + CompletableFuture<Long> appendEntry(LogEntryProto entry); + + /** + * Append asynchronously all the given log entries. + * Used by the followers. + * + * If an existing entry conflicts with a new one (same index but different terms), + * delete the existing entry and all entries that follow it (§5.3). + */ + List<CompletableFuture<Long>> append(LogEntryProto... entries); + + /** + * Truncate asynchronously the log entries till the given index (inclusively). + * Used by the leader and the followers. + */ + CompletableFuture<Long> truncate(long index); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1e8b2344/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 666b9c8..3aee7e1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -34,7 +34,6 @@ import org.apache.ratis.util.Preconditions; import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -253,7 +252,7 @@ public class SegmentedRaftLog extends RaftLog { * {@link #append(LogEntryProto...)} need protection of RaftServer's lock. */ @Override - CompletableFuture<Long> truncate(long index) { + CompletableFuture<Long> truncateImpl(long index) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { RaftLogCache.TruncationSegments ts = cache.truncate(index); @@ -266,8 +265,7 @@ public class SegmentedRaftLog extends RaftLog { } @Override - CompletableFuture<Long> appendEntry(LogEntryProto entry) { - + CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) { checkLogState(); if (LOG.isTraceEnabled()) { LOG.trace("{}: appendEntry {}", server.getId(), @@ -322,8 +320,7 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public List<CompletableFuture<Long>> append(LogEntryProto... entries) { - + public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) { checkLogState(); if (entries == null || entries.length == 0) { return Collections.emptyList();
