Repository: incubator-ratis Updated Branches: refs/heads/master 9a8daa1d5 -> c1b23fdb0
RATIS-124. RaftLog should be sync'ed on all the entries appended but not the last entry. Contributed by Kit Hui Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c1b23fdb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c1b23fdb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c1b23fdb Branch: refs/heads/master Commit: c1b23fdb0bfe5d3e9937d60f0baa43772737f002 Parents: 9a8daa1 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Dec 12 18:07:25 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Dec 12 18:07:25 2017 +0800 ---------------------------------------------------------------------- .../ratis/server/impl/RaftServerImpl.java | 15 +++--- .../ratis/server/storage/MemoryRaftLog.java | 33 ++++++------ .../apache/ratis/server/storage/RaftLog.java | 17 +++--- .../ratis/server/storage/RaftLogWorker.java | 23 ++++---- .../ratis/server/storage/SegmentedRaftLog.java | 55 ++++++++++---------- .../apache/ratis/statemachine/StateMachine.java | 3 +- .../ratis/server/storage/TestCacheEviction.java | 7 ++- .../server/storage/TestSegmentedRaftLog.java | 20 +++---- 8 files changed, 81 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index bc45b5f..8b784e6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -25,6 +25,7 @@ import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -723,6 +723,8 @@ public class RaftServerImpl implements RaftServerProtocol, throw new IOException(e); } + final List<CompletableFuture<Long>> futures; + final long currentTerm; long nextIndex = state.getLog().getNextIndex(); synchronized (this) { @@ -764,16 +766,17 @@ public class RaftServerImpl implements RaftServerProtocol, return reply; } - state.getLog().append(entries); + futures = state.getLog().append(entries); + state.updateConfiguration(entries); state.updateStatemachine(leaderCommit, currentTerm); } if (entries.length > 0) { - try { - state.getLog().logSync(); - } catch (InterruptedException e) { - throw new InterruptedIOException("logSync got interrupted"); + CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null); + for (CompletableFuture future : futures) { + future.join(); } + nextIndex = entries[entries.length - 1].getIndex() + 1; } synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index ecbae2e..3c14d2c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -17,9 +17,6 @@ */ package org.apache.ratis.server.storage; -import java.util.ArrayList; -import java.util.List; - import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; @@ -27,10 +24,14 @@ import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; -import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + /** * A simple RaftLog implementation in memory. Used only for testing. */ @@ -79,7 +80,7 @@ public class MemoryRaftLog extends RaftLog { } @Override - void truncate(long index) { + CompletableFuture<Long> truncate(long index) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { Preconditions.assertTrue(index >= 0); @@ -88,6 +89,7 @@ public class MemoryRaftLog extends RaftLog { entries.remove(i); } } + return CompletableFuture.completedFuture(index); } @Override @@ -100,11 +102,12 @@ public class MemoryRaftLog extends RaftLog { } @Override - void appendEntry(LogEntryProto entry) { + CompletableFuture<Long> appendEntry(LogEntryProto entry) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { entries.add(entry); } + return CompletableFuture.completedFuture(entry.getIndex()); } @Override @@ -126,11 +129,11 @@ public class MemoryRaftLog extends RaftLog { } @Override - public void append(LogEntryProto... entries) { + public List<CompletableFuture<Long>> append(LogEntryProto... entries) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { if (entries == null || entries.length == 0) { - return; + return Collections.emptyList(); } // Before truncating the entries, we first need to check if some // entries are duplicated. If the leader sends entry 6, entry 7, then @@ -151,12 +154,18 @@ public class MemoryRaftLog extends RaftLog { break; } } + final List<CompletableFuture<Long>> futures; if (toTruncate) { - truncate(truncateIndex); + futures = new ArrayList<>(entries.length - index + 1); + futures.add(truncate(truncateIndex)); + } else { + futures = new ArrayList<>(entries.length - index); } for (int i = index; i < entries.length; i++) { this.entries.add(entries[i]); + futures.add(CompletableFuture.completedFuture(entries[i].getIndex())); } + return futures; } } @@ -171,12 +180,6 @@ public class MemoryRaftLog extends RaftLog { } @Override - public void logSync() { - CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null); - // do nothing - } - - @Override public long getLatestFlushedIndex() { return getNextIndex() - 1; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index ac86582..8edb8a1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -35,10 +35,11 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; - /** * Base class of RaftLog. Currently we provide two types of RaftLog * implementation: @@ -207,13 +208,13 @@ public abstract class RaftLog implements Closeable { * Truncate the log entries till the given index. The log with the given index * will also be truncated (i.e., inclusive). */ - abstract void truncate(long index); + abstract CompletableFuture<Long> truncate(long index); /** * Used by the leader when appending a new entry based on client's request * or configuration change. */ - abstract void appendEntry(LogEntryProto entry); + abstract CompletableFuture<Long> appendEntry(LogEntryProto entry); /** * Append all the given log entries. Used by the followers. @@ -224,15 +225,9 @@ public abstract class RaftLog implements Closeable { * This method, {@link #append(long, TransactionContext, ClientId, long)}, * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)}, * do not guarantee the changes are persisted. - * Need to call {@link #logSync()} to persist the changes. - */ - public abstract void append(LogEntryProto... entries); - - /** - * Flush and sync the log. - * It is triggered by AppendEntries RPC request from the leader. + * Need to wait for the returned futures to persist the changes. */ - public abstract void logSync() throws InterruptedException; + public abstract List<CompletableFuture<Long>> append(LogEntryProto... entries); /** * @return the index of the latest entry that has been flushed to the local http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 2e1177f..81bd22d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -37,7 +37,10 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -267,31 +270,23 @@ class RaftLogWorker implements Runnable { private class WriteLog extends Task { private final LogEntryProto entry; private final CompletableFuture<?> stateMachineFuture; + private final CompletableFuture<Long> combined; WriteLog(LogEntryProto entry) { this.entry = ProtoUtils.removeStateMachineData(entry); if (this.entry == entry || stateMachine == null) { this.stateMachineFuture = null; + this.combined = super.getFuture(); } else { // this.entry != entry iff the entry has state machine data this.stateMachineFuture = stateMachine.writeStateMachineData(entry); + this.combined = super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index); } } @Override - void waitForDone() throws InterruptedException { - super.waitForDone(); - // TODO: It does not work since logSync only wait for the last task L. - // TODO: If some task T earlier than L has a writeStateMachineData future, it will not be sync'ed. - // TODO: Need RATIS-124 - - if (stateMachineFuture != null) { - try { - stateMachineFuture.get(); - } catch (ExecutionException e) { - // ignore - } - } + CompletableFuture<Long> getFuture() { + return combined; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 5286738..76b2bd7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -29,14 +29,16 @@ import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; -import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Preconditions; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; /** @@ -73,19 +75,17 @@ public class SegmentedRaftLog extends RaftLog { * I/O task definitions. */ static abstract class Task { - private boolean done = false; + private final CompletableFuture<Long> future = new CompletableFuture<>(); - synchronized void done() { - done = true; - notifyAll(); + CompletableFuture<Long> getFuture() { + return future; } - synchronized void waitForDone() throws InterruptedException { - while (!done) { - wait(); - } + void done() { + future.complete(getEndIndex()); } + abstract void execute() throws IOException; abstract long getEndIndex(); @@ -95,7 +95,6 @@ public class SegmentedRaftLog extends RaftLog { return getClass().getSimpleName() + ":" + getEndIndex(); } } - private static final ThreadLocal<Task> myTask = new ThreadLocal<>(); private final RaftServerImpl server; private final RaftStorage storage; @@ -232,19 +231,21 @@ public class SegmentedRaftLog extends RaftLog { * {@link #append(LogEntryProto...)} need protection of RaftServer's lock. */ @Override - void truncate(long index) { + CompletableFuture<Long> truncate(long index) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { RaftLogCache.TruncationSegments ts = cache.truncate(index); if (ts != null) { Task task = fileLogWorker.truncate(ts); - myTask.set(task); + return task.getFuture(); } } + return CompletableFuture.completedFuture(index); } @Override - void appendEntry(LogEntryProto entry) { + CompletableFuture<Long> appendEntry(LogEntryProto entry) { + checkLogState(); if (LOG.isTraceEnabled()) { LOG.trace("{}: appendEntry {}", server.getId(), @@ -272,7 +273,7 @@ public class SegmentedRaftLog extends RaftLog { } cache.appendEntry(entry); - myTask.set(fileLogWorker.writeLogEntry(entry)); + return fileLogWorker.writeLogEntry(entry).getFuture(); } } @@ -289,11 +290,13 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public void append(LogEntryProto... entries) { + public List<CompletableFuture<Long>> append(LogEntryProto... entries) { + checkLogState(); if (entries == null || entries.length == 0) { - return; + return Collections.emptyList(); } + try(AutoCloseableLock writeLock = writeLock()) { Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex()); int index = 0; @@ -318,25 +321,21 @@ public class SegmentedRaftLog extends RaftLog { break; } } + + final List<CompletableFuture<Long>> futures; if (truncateIndex != -1) { - // truncate from truncateIndex - truncate(truncateIndex); + futures = new ArrayList<>(entries.length - index + 1); + futures.add(truncate(truncateIndex)); + } else { + futures = new ArrayList<>(entries.length - index); } - // append from entries[index] for (int i = index; i < entries.length; i++) { - appendEntry(entries[i]); + futures.add(appendEntry(entries[i])); } + return futures; } } - @Override - public void logSync() throws InterruptedException { - CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null); - final Task task = myTask.get(); - if (task != null) { - task.waitForDone(); - } - } @Override public long getLatestFlushedIndex() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index ae26cb1..1d47815 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -128,8 +128,7 @@ public interface StateMachine extends Closeable { /** * Write asynchronously the state machine data to this state machine. * - * @return a future for the write task - * if {@link RaftLog#logSync()} should also sync writing the state machine data; + * @return a future for the write task if the state machine data should be sync'ed; * otherwise, return null. */ default CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java index 8ce33f5..c94fb73 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -41,6 +41,7 @@ import org.mockito.Mockito; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; public class TestCacheEviction extends BaseTest { private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault(); @@ -168,8 +169,7 @@ public class TestCacheEviction extends BaseTest { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0); LogEntryProto[] entries = generateEntries(slist); - raftLog.append(entries); - raftLog.logSync(); + raftLog.append(entries).forEach(CompletableFuture::join); // check the current cached segment number: the last segment is still open Assert.assertEquals(maxCachedNum - 1, @@ -179,8 +179,7 @@ public class TestCacheEviction extends BaseTest { Mockito.when(state.getLastAppliedIndex()).thenReturn(35L); slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum); entries = generateEntries(slist); - raftLog.append(entries); - raftLog.logSync(); + raftLog.append(entries).forEach(CompletableFuture::join); // check the cached segment number again. since the slowest follower is on // index 21, the eviction should happen and evict 3 segments http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 09b31c1..2a0ec60 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class TestSegmentedRaftLog extends BaseTest { @@ -192,8 +193,7 @@ public class TestSegmentedRaftLog extends BaseTest { new SegmentedRaftLog(peerId, null, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); } try (SegmentedRaftLog raftLog = @@ -221,8 +221,7 @@ public class TestSegmentedRaftLog extends BaseTest { new SegmentedRaftLog(peerId, null, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); } try (SegmentedRaftLog raftLog = @@ -244,8 +243,7 @@ public class TestSegmentedRaftLog extends BaseTest { new SegmentedRaftLog(peerId, null, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); } for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) { @@ -259,8 +257,8 @@ public class TestSegmentedRaftLog extends BaseTest { new SegmentedRaftLog(peerId, null, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // truncate the log - raftLog.truncate(fromIndex); - raftLog.logSync(); + raftLog.truncate(fromIndex).join(); + checkEntries(raftLog, entries, 0, (int) fromIndex); } @@ -317,8 +315,7 @@ public class TestSegmentedRaftLog extends BaseTest { new SegmentedRaftLog(peerId, null, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); } // append entries whose first 100 entries are the same with existing log, @@ -332,8 +329,7 @@ public class TestSegmentedRaftLog extends BaseTest { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])); - raftLog.logSync(); + raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join); checkEntries(raftLog, entries, 0, 650); checkEntries(raftLog, newEntries, 100, 100);
