This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit bb0f0021f68c8b2c011be0de23b258ae24151848 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Aug 10 01:43:08 2022 -0700 RATIS-1663. Record call id for board casting a heartbeat. (#706) (cherry picked from commit 5dd3c1db093bb06e462afbd0df4b8b215bbd8bf3) --- .../src/main/java/org/apache/ratis/rpc/CallId.java | 12 ++++ .../apache/ratis/grpc/server/GrpcLogAppender.java | 28 +++++++-- .../apache/ratis/server/leader/LogAppender.java | 7 +++ .../org/apache/ratis/server/impl/ReadRequests.java | 66 ++++++++++++++++++++++ .../ratis/server/leader/LogAppenderDefault.java | 16 +++++- 5 files changed, 120 insertions(+), 9 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java index a6914e27b..85e6ef06b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java +++ b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.rpc; +import java.util.Comparator; import java.util.concurrent.atomic.AtomicLong; /** @@ -27,6 +28,17 @@ import java.util.concurrent.atomic.AtomicLong; public final class CallId { private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); + private static final Comparator<Long> COMPARATOR = (left, right) -> { + final long diff = left - right; + // check diff < Long.MAX_VALUE/2 for the possibility of numerical overflow + return diff == 0? 0: diff > 0 && diff < Long.MAX_VALUE/2? 1: -1; + }; + + /** @return a long comparator, which takes care the possibility of numerical overflow, for comparing call ids. */ + public static Comparator<Long> getComparator() { + return COMPARATOR; + } + /** @return the default value. */ public static long getDefault() { return 0; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 3f01ea299..3e33a1787 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -47,6 +47,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ratis.thirdparty.com.codahale.metrics.Timer; @@ -56,9 +57,16 @@ import org.apache.ratis.thirdparty.com.codahale.metrics.Timer; public class GrpcLogAppender extends LogAppenderBase { public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class); + private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> { + // calculate diff in order to take care the possibility of numerical overflow + final long diff = left - right; + return diff == 0? 0: diff > 0? 1: -1; + }; + + private final AtomicLong callId = new AtomicLong(); + private final RequestMap pendingRequests = new RequestMap(); private final int maxPendingRequestsNum; - private long callId = 0; private volatile boolean firstResponseReceived = false; private final boolean installSnapshotEnabled; @@ -235,15 +243,23 @@ public class GrpcLogAppender extends LogAppenderBase { } } + @Override + public long getCallId() { + return callId.get(); + } + + @Override + public Comparator<Long> getCallIdComparator() { + return CALL_ID_COMPARATOR; + } + private void appendLog(boolean excludeLogEntries) throws IOException { final AppendEntriesRequestProto pending; final AppendEntriesRequest request; - final StreamObserver<AppendEntriesRequestProto> s; try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { - // prepare and enqueue the append request. note changes on follower's - // nextIndex and ops on pendingRequests should always be associated - // together and protected by the lock - pending = newAppendEntriesRequest(callId++, excludeLogEntries); + // Prepare and send the append request. + // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock + pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries); if (pending == null) { return; } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index 135b4318d..ef5e1a7ed 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Comparator; /** * A {@link LogAppender} is for the leader to send appendEntries to a particular follower. @@ -81,6 +82,12 @@ public interface LogAppender { return getFollower().getPeer().getId(); } + /** @return the call id for the next {@link AppendEntriesRequestProto}. */ + long getCallId(); + + /** @return the a {@link Comparator} for comparing call ids. */ + Comparator<Long> getCallIdComparator(); + /** * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}. * The {@link AppendEntriesRequestProto} object may contain zero or more log entries. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java new file mode 100644 index 000000000..b8d8998c7 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.server.leader.LogAppender; + +/** For supporting linearizable read. */ +class ReadRequests { + /** The acknowledgement from a {@link LogAppender} of a heartbeat for a particular call id. */ + static class HeartbeatAck { + private final LogAppender appender; + private final long minCallId; + private volatile boolean acknowledged = false; + + HeartbeatAck(LogAppender appender) { + this.appender = appender; + this.minCallId = appender.getCallId(); + } + + /** Is the heartbeat (for a particular call id) acknowledged? */ + boolean isAcknowledged() { + return acknowledged; + } + + /** + * @return true if the acknowledged state is changed from false to true; + * otherwise, the acknowledged state remains unchanged, return false. + */ + boolean receive(AppendEntriesReplyProto reply) { + if (acknowledged) { + return false; + } + synchronized (this) { + if (!acknowledged && isValid(reply)) { + acknowledged = true; + return true; + } + return false; + } + } + + private boolean isValid(AppendEntriesReplyProto reply) { + if (reply == null || !reply.getServerReply().getSuccess()) { + return false; + } + // valid only if the reply has a later call id than the min. + return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0; + } + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index c9d341409..0c91427e5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.server.leader; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; @@ -30,6 +29,7 @@ import org.apache.ratis.statemachine.SnapshotInfo; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Comparator; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -42,12 +42,22 @@ class LogAppenderDefault extends LogAppenderBase { super(server, leaderState, f); } + @Override + public long getCallId() { + return CallId.get(); + } + + @Override + public Comparator<Long> getCallIdComparator() { + return CallId.getComparator(); + } + /** Send an appendEntries RPC; retry indefinitely. */ - @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") private AppendEntriesReplyProto sendAppendEntriesWithRetries() throws InterruptedException, InterruptedIOException, RaftLogIOException { int retry = 0; - AppendEntriesRequestProto request = null; + + AppendEntriesRequestProto request = newAppendEntriesRequest(CallId.getAndIncrement(), false); while (isRunning()) { // keep retrying for IOException try { if (request == null || request.getEntriesCount() == 0) {
