This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 051fe4e RATIS-595. appendEntry future should be completed only after
the entry is flushed.
051fe4e is described below
commit 051fe4e657d542f88c0dabbbd5a90908295f8964
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jun 26 16:35:00 2019 +0800
RATIS-595. appendEntry future should be completed only after the entry is
flushed.
---
.../org/apache/ratis/server/impl/LeaderState.java | 2 +-
.../ratis/server/impl/RaftServerConstants.java | 4 +-
.../org/apache/ratis/server/raftlog/RaftLog.java | 8 +--
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 2 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 18 +++--
.../raftlog/segmented/SegmentedRaftLogWorker.java | 78 ++++++++++++++++++----
.../server/impl/RaftReconfigurationBaseTest.java | 2 +-
.../ratis/server/storage/RaftStorageTestUtils.java | 2 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 36 +++++-----
9 files changed, 105 insertions(+), 47 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 13b7c8e..2271908 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -560,7 +560,7 @@ public class LeaderState {
}
private void updateCommit() {
- getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getLatestFlushedIndex)
+ getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getFlushIndex)
.ifPresent(m -> updateCommit(m.majority, m.min));
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index 42eaed2..da05037 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -20,9 +20,9 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.server.raftlog.RaftLog;
public interface RaftServerConstants {
- /** @deprecated use {@link RaftLog#LEAST_VALID_LOG_INDEX} - 1. */
+ /** @deprecated use {@link RaftLog#INVALID_LOG_INDEX}. */
@Deprecated
- long INVALID_LOG_INDEX = RaftLog.LEAST_VALID_LOG_INDEX - 1;
+ long INVALID_LOG_INDEX = RaftLog.INVALID_LOG_INDEX;
long DEFAULT_CALLID = 0;
enum StartupOption {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index ae357dc..01474fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -64,6 +64,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
/** The least valid log index, i.e. the index used when writing to an empty
log. */
public static final long LEAST_VALID_LOG_INDEX = 0L;
+ public static final long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
/**
* The largest committed index. Note the last committed log may be included
@@ -118,7 +119,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
// paper for details.
final TermIndex entry = getTermIndex(majorityIndex);
if (entry != null && entry.getTerm() == currentTerm) {
- final long newCommitIndex = Math.min(majorityIndex,
getLatestFlushedIndex());
+ final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
if (newCommitIndex > oldCommittedIndex) {
commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
}
@@ -368,10 +369,9 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto...
entries);
/**
- * @return the index of the latest entry that has been flushed to the local
- * storage.
+ * @return the index of the last entry that has been flushed to the local
storage.
*/
- public abstract long getLatestFlushedIndex();
+ public abstract long getFlushIndex();
/**
* Write and flush the metadata (votedFor and term) into the meta file.
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index f203282..f07aa1a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -203,7 +203,7 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public long getLatestFlushedIndex() {
+ public long getFlushIndex() {
return getNextIndex() - 1;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 7ea75ce..6db41c7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -35,6 +35,7 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.StringUtils;
import java.io.File;
import java.io.IOException;
@@ -83,8 +84,13 @@ public class SegmentedRaftLog extends RaftLog {
}
void done() {
- Preconditions.assertTrue(!future.isDone());
- future.complete(getEndIndex());
+ completeFuture();
+ }
+
+ final void completeFuture() {
+ final boolean completed = future.complete(getEndIndex());
+ Preconditions.assertTrue(completed,
+ () -> this + " is already " +
StringUtils.completableFuture2String(future, false));
}
void failed(IOException e) {
@@ -231,7 +237,7 @@ public class SegmentedRaftLog extends RaftLog {
// segment's cache, should block the new entry appending or new segment
// allocation.
final RaftServerImpl s = server.get();
- cache.evictCache(s.getFollowerNextIndices(),
fileLogWorker.getFlushedIndex(), s.getState().getLastAppliedIndex());
+ cache.evictCache(s.getFollowerNextIndices(),
fileLogWorker.getFlushIndex(), s.getState().getLastAppliedIndex());
}
}
@@ -386,8 +392,8 @@ public class SegmentedRaftLog extends RaftLog {
@Override
- public long getLatestFlushedIndex() {
- return fileLogWorker.getFlushedIndex();
+ public long getFlushIndex() {
+ return fileLogWorker.getFlushIndex();
}
@Override
@@ -434,7 +440,7 @@ public class SegmentedRaftLog extends RaftLog {
public String toString() {
try(AutoCloseableLock readLock = readLock()) {
if (isOpened()) {
- return super.toString() + ",f" + getLatestFlushedIndex()
+ return super.toString() + ",f" + getFlushIndex()
+ ",i" +
Optional.ofNullable(getLastEntryTermIndex()).map(TermIndex::getIndex).orElse(0L);
} else {
return super.toString();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index c4f6aa9..e3d181d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -26,11 +26,12 @@ import org.apache.ratis.metrics.impl.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.server.storage.RaftStorage;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
@@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.function.Supplier;
/**
@@ -92,11 +96,48 @@ class SegmentedRaftLogWorker implements Runnable {
}
}
+ static class WriteLogTasks {
+ private final Queue<WriteLog> q = new LinkedList<>();
+ private volatile long index;
+
+ void offerOrCompleteFuture(WriteLog writeLog) {
+ if (writeLog.getEndIndex() <= index || !offer(writeLog)) {
+ writeLog.completeFuture();
+ }
+ }
+
+ private synchronized boolean offer(WriteLog writeLog) {
+ if (writeLog.getEndIndex() <= index) { // compare again synchronized
+ return false;
+ }
+ q.offer(writeLog);
+ return true;
+ }
+
+ synchronized void updateIndex(long i) {
+ index = i;
+
+ for(;;) {
+ final Task peeked = q.peek();
+ if (peeked == null || peeked.getEndIndex() > index) {
+ return;
+ }
+ final Task polled = q.poll();
+ Preconditions.assertTrue(polled == peeked);
+ polled.completeFuture();
+ }
+ }
+ }
+
+ private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}",
this, s);
+ private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}",
this, s);
+
private final String name;
/**
* The task queue accessed by rpc handler threads and the io worker thread.
*/
private final DataBlockingQueue<Task> queue;
+ private final WriteLogTasks writeTasks = new WriteLogTasks();
private volatile boolean running = true;
private final Thread workerThread;
@@ -114,7 +155,7 @@ class SegmentedRaftLogWorker implements Runnable {
/** the index of the last entry that has been written */
private long lastWrittenIndex;
/** the largest index of the entry that has been flushed */
- private volatile long flushedIndex;
+ private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0);
private final int forceSyncNum;
@@ -167,7 +208,7 @@ class SegmentedRaftLogWorker implements Runnable {
void start(long latestIndex, File openSegmentFile) throws IOException {
LOG.trace("{} start(latestIndex={}, openSegmentFile={})", name,
latestIndex, openSegmentFile);
lastWrittenIndex = latestIndex;
- flushedIndex = latestIndex;
+ flushIndex.setUnconditionally(latestIndex, infoIndexChange);
if (openSegmentFile != null) {
Preconditions.assertTrue(openSegmentFile.exists());
out = new SegmentedRaftLogOutputStream(openSegmentFile, true,
segmentMaxSize,
@@ -194,7 +235,7 @@ class SegmentedRaftLogWorker implements Runnable {
void syncWithSnapshot(long lastSnapshotIndex) {
queue.clear();
lastWrittenIndex = lastSnapshotIndex;
- flushedIndex = lastSnapshotIndex;
+ flushIndex.setUnconditionally(lastSnapshotIndex, infoIndexChange);
pendingFlushNum = 0;
}
@@ -312,13 +353,18 @@ class SegmentedRaftLogWorker implements Runnable {
} finally {
timerContext.stop();
}
- updateFlushedIndex();
+ updateFlushedIndexIncreasingly();
}
}
- private void updateFlushedIndex() {
- LOG.debug("{}: updateFlushedIndex {} -> {}", name, flushedIndex,
lastWrittenIndex);
- flushedIndex = lastWrittenIndex;
+ private void updateFlushedIndexIncreasingly() {
+ final long i = lastWrittenIndex;
+ flushIndex.updateIncreasingly(i, traceIndexChange);
+ postUpdateFlushedIndex();
+ writeTasks.updateIndex(i);
+ }
+
+ private void postUpdateFlushedIndex() {
pendingFlushNum = 0;
Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
}
@@ -421,6 +467,11 @@ class SegmentedRaftLogWorker implements Runnable {
}
@Override
+ void done() {
+ writeTasks.offerOrCompleteFuture(this);
+ }
+
+ @Override
public void execute() throws IOException {
if (stateMachineDataPolicy.isSync() && stateMachineFuture != null) {
stateMachineDataPolicy.getFromFuture(stateMachineFuture, () -> this +
"-writeStateMachineData");
@@ -477,7 +528,7 @@ class SegmentedRaftLogWorker implements Runnable {
FileUtils.deleteFile(openFile);
LOG.info("{}: Deleted empty log segment {}", name, openFile);
}
- updateFlushedIndex();
+ updateFlushedIndexIncreasingly();
}
@Override
@@ -589,7 +640,8 @@ class SegmentedRaftLogWorker implements Runnable {
if (stateMachineFuture != null) {
IOUtils.getFromFuture(stateMachineFuture, () -> this +
"-truncateStateMachineData");
}
- updateFlushedIndex();
+ flushIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
+ postUpdateFlushedIndex();
}
@Override
@@ -599,7 +651,7 @@ class SegmentedRaftLogWorker implements Runnable {
} else if (segments.toDelete.length > 0) {
return segments.toDelete[segments.toDelete.length - 1].endIndex;
}
- return RaftServerConstants.INVALID_LOG_INDEX;
+ return RaftLog.INVALID_LOG_INDEX;
}
@Override
@@ -608,7 +660,7 @@ class SegmentedRaftLogWorker implements Runnable {
}
}
- long getFlushedIndex() {
- return flushedIndex;
+ long getFlushIndex() {
+ return flushIndex.get();
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index d937950..ac022bf 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -532,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
}, 10, sleepTime, "confIndex", LOG);
// wait till the old leader persist the new conf
- JavaUtils.attempt(() -> log.getLatestFlushedIndex() >= confIndex,
+ JavaUtils.attempt(() -> log.getFlushIndex() >= confIndex,
10, sleepTime, "FLUSH", LOG);
final long committed = log.getLastCommittedIndex();
Assert.assertTrue(committed < confIndex);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index 07283b0..eb52869 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -43,7 +43,7 @@ public interface RaftStorageTestUtils {
final long flushed, committed;
try(AutoCloseableLock readlock = log.readLock()) {
last = log.getLastEntryTermIndex();
- flushed = log.getLatestFlushedIndex();
+ flushed = log.getFlushIndex();
committed = log.getLastCommittedIndex();
}
final StringBuilder b = new StringBuilder();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index cda1043..353dffb 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -163,7 +163,7 @@ public class TestSegmentedRaftLog extends BaseTest {
// create RaftLog object and load log file
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if log entries are loaded correctly
for (LogEntryProto e : entries) {
LogEntryProto entry = raftLog.get(e.getIndex());
@@ -219,21 +219,21 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
checkEntries(raftLog, entries, 0, entries.size());
}
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
TermIndex lastTermIndex = raftLog.getLastEntryTermIndex();
IllegalStateException ex = null;
try {
@@ -272,14 +272,14 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
checkEntries(raftLog, entries, 0, entries.size());
Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
@@ -294,7 +294,7 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
@@ -308,7 +308,7 @@ public class TestSegmentedRaftLog extends BaseTest {
throws Exception {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// truncate the log
raftLog.truncate(fromIndex).join();
@@ -318,7 +318,7 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
if (fromIndex > 0) {
Assert.assertEquals(entries.get((int) (fromIndex - 1)),
@@ -402,7 +402,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final RaftProperties p = new RaftProperties();
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null,
storage, -1, p)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
final Long purged = f.get();
@@ -426,7 +426,7 @@ public class TestSegmentedRaftLog extends BaseTest {
doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache,
entry));
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
@@ -442,7 +442,7 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
LOG.info("newEntries[0] = {}", newEntries.get(0));
final int last = newEntries.size() - 1;
LOG.info("newEntries[{}] = {}", last, newEntries.get(last));
@@ -454,19 +454,19 @@ public class TestSegmentedRaftLog extends BaseTest {
Assert.assertEquals(newEntries.get(newEntries.size() - 1),
getLastEntry(raftLog));
Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
- raftLog.getLatestFlushedIndex());
+ raftLog.getFlushIndex());
}
// load the raftlog again and check
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
Assert.assertEquals(newEntries.get(newEntries.size() - 1),
getLastEntry(raftLog));
Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
- raftLog.getLatestFlushedIndex());
+ raftLog.getFlushIndex());
SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
Assert.assertEquals(5, cache.getNumOfSegments());
@@ -480,7 +480,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm,
null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
int next = 0;
long flush = -1;
@@ -544,7 +544,7 @@ public class TestSegmentedRaftLog extends BaseTest {
doNothing().when(server).shutdown(false);
Throwable ex = null; // TimeoutIOException
try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, sm,
null, storage, -1, properties)) {
- raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// SegmentedRaftLogWorker should catch TimeoutIOException
CompletableFuture<Long> f = raftLog.appendEntry(entry);
// Wait for async writeStateMachineData to finish
@@ -567,7 +567,7 @@ public class TestSegmentedRaftLog extends BaseTest {
void assertIndices(RaftLog raftLog, long expectedFlushIndex, long
expectedNextIndex) {
LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
- Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex());
+ Assert.assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
LOG.info("assert expectedNextIndex={}", expectedNextIndex);
Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex());
}