This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e493b60 RATIS-559. LogAppender.getPrevious throws
IllegalStateException.
e493b60 is described below
commit e493b60b795ce0ec41affbb5052d7c6a3255dfff
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri May 24 14:06:50 2019 +0800
RATIS-559. LogAppender.getPrevious throws IllegalStateException.
---
.../org/apache/ratis/server/impl/LogAppender.java | 51 +++++++++++++++++-----
.../ratis/server/impl/RaftServerConstants.java | 6 ++-
.../org/apache/ratis/server/raftlog/RaftLog.java | 3 ++
3 files changed, 48 insertions(+), 12 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index ff13136..d5a6ec0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -169,23 +169,33 @@ public class LogAppender {
return getFollower().getPeer().getId();
}
- private TermIndex getPrevious() {
- final long nextIndex = follower.getNextIndex();
- final TermIndex previous = raftLog.getTermIndex(nextIndex - 1);
+ private TermIndex getPrevious(long nextIndex) {
+ if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
+ return null;
+ }
+
+ final long previousIndex = nextIndex - 1;
+ final TermIndex previous = raftLog.getTermIndex(previousIndex);
if (previous != null) {
return previous;
}
- final long logStartIndex = raftLog.getStartIndex();
- Preconditions.assertTrue(nextIndex == logStartIndex,
- "%s: follower's nextIndex = %s != logStartIndex = %s", this,
nextIndex, logStartIndex);
+
final SnapshotInfo snapshot = server.getState().getLatestSnapshot();
- return snapshot == null ? null : snapshot.getTermIndex();
+ if (snapshot != null) {
+ final TermIndex snapshotTermIndex = snapshot.getTermIndex();
+ if (snapshotTermIndex.getIndex() == previousIndex) {
+ return snapshotTermIndex;
+ }
+ }
+
+ return null;
}
protected AppendEntriesRequestProto createRequest(long callId) throws
RaftLogIOException {
- final TermIndex previous = getPrevious();
+ final TermIndex previous = getPrevious(follower.getNextIndex());
final long heartbeatRemainingMs = getHeartbeatRemainingTime();
if (heartbeatRemainingMs <= 0L) {
+ // heartbeat
return leaderState.newAppendEntriesRequestProto(
getFollowerId(), previous, Collections.emptyList(),
!follower.isAttendingVote(), callId);
}
@@ -193,7 +203,9 @@ public class LogAppender {
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " +
buffer.getNumElements() + " elements.");
final long leaderNext = raftLog.getNextIndex();
- for (long next = follower.getNextIndex(); leaderNext > next; ) {
+ final long followerNext = follower.getNextIndex();
+ final long halfMs = heartbeatRemainingMs/2;
+ for (long next = followerNext; leaderNext > next &&
getHeartbeatRemainingTime() - halfMs > 0; ) {
if (!buffer.offer(raftLog.getEntryWithData(next++))) {
break;
}
@@ -202,13 +214,30 @@ public class LogAppender {
return null;
}
- final List<LogEntryProto> protos = buffer.pollList(heartbeatRemainingMs,
EntryWithData::getEntry,
- (entry, time, exception) -> LOG.warn(this + ": Failed get " + entry +
" in " + time, exception));
+ final List<LogEntryProto> protos =
buffer.pollList(getHeartbeatRemainingTime(), EntryWithData::getEntry,
+ (entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
+ follower.getName(), entry, time, exception));
buffer.clear();
+ assertProtos(protos, followerNext, previous);
return leaderState.newAppendEntriesRequestProto(
getFollowerId(), previous, protos, !follower.isAttendingVote(),
callId);
}
+ private void assertProtos(List<LogEntryProto> protos, long nextIndex,
TermIndex previous) {
+ if (protos.isEmpty()) {
+ return;
+ }
+ final long firstIndex = protos.get(0).getIndex();
+ Preconditions.assertTrue(firstIndex == nextIndex,
+ () -> follower.getName() + ": firstIndex = " + firstIndex + " !=
nextIndex = " + nextIndex);
+ if (firstIndex > RaftLog.LEAST_VALID_LOG_INDEX) {
+ Objects.requireNonNull(previous,
+ () -> follower.getName() + ": Previous TermIndex not found for
firstIndex = " + firstIndex);
+ Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
+ () -> follower.getName() + ": Previous = " + previous + " but
firstIndex = " + firstIndex);
+ }
+ }
+
/** Send an appendEntries RPC; retry indefinitely. */
private AppendEntriesReplyProto sendAppendEntriesWithRetries()
throws InterruptedException, InterruptedIOException, RaftLogIOException {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index f70f98a..42eaed2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -17,8 +17,12 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.server.raftlog.RaftLog;
+
public interface RaftServerConstants {
- long INVALID_LOG_INDEX = -1;
+ /** @deprecated use {@link RaftLog#LEAST_VALID_LOG_INDEX} - 1. */
+ @Deprecated
+ long INVALID_LOG_INDEX = RaftLog.LEAST_VALID_LOG_INDEX - 1;
long DEFAULT_CALLID = 0;
enum StartupOption {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index bc35014..fe7625d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -60,6 +60,9 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}",
getSelfId(), s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}",
getSelfId(), s);
+ /** The least valid log index, i.e. the index used when writing to an empty
log. */
+ public static final long LEAST_VALID_LOG_INDEX = 0L;
+
/**
* The largest committed index. Note the last committed log may be included
* in the latest snapshot file.