This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 4c68f568c92d2f9a0c907cb4b5bcac2163bd3e0d Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Jun 10 03:00:49 2022 -0700 RATIS-1590. Include the new log index in Watch reply. (#657) (cherry picked from commit c1032750ae2d28b7504a2a9f6b917e290927b576) --- .../java/org/apache/ratis/client/api/AsyncApi.java | 3 +++ .../org/apache/ratis/client/api/BlockingApi.java | 3 +++ .../main/java/org/apache/ratis/util/IOUtils.java | 17 +++++++++++-- .../apache/ratis/grpc/client/GrpcClientRpc.java | 2 +- .../apache/ratis/server/impl/LeaderStateImpl.java | 2 +- .../apache/ratis/server/impl/WatchRequests.java | 29 ++++++++++------------ .../java/org/apache/ratis/WatchRequestTests.java | 7 +++--- 7 files changed, 39 insertions(+), 24 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java index fd000ad2..1e8da9bd 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java @@ -66,6 +66,9 @@ public interface AsyncApi { * @param index The log index to be watched. * @param replication The replication level required. * @return a future of the reply. + * When {@link RaftClientReply#isSuccess()} == true, + * the reply index (i.e. {@link RaftClientReply#getLogIndex()}) is the log index satisfying the request, + * where reply index >= watch index. */ CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel replication); } diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java index e1679b97..84f9740e 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java @@ -66,6 +66,9 @@ public interface BlockingApi { * @param index The log index to be watched. * @param replication The replication level required. * @return the reply. + * When {@link RaftClientReply#isSuccess()} == true, + * the reply index (i.e. {@link RaftClientReply#getLogIndex()}) is the log index satisfying the request, + * where reply index >= watch index. */ RaftClientReply watch(long index, ReplicationLevel replication) throws IOException; } \ No newline at end of file diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index bd50a87a..0153ac49 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -214,12 +214,25 @@ public interface IOUtils { return readObject(new ByteArrayInputStream(bytes), clazz); } + /** + * Read an object from the given input stream. + * + * @param in input stream to read from. + * @param clazz the class of the object. + * @return the object read from the stream. + * + * @param <T> The class type. + */ static <T> T readObject(InputStream in, Class<T> clazz) { - Object obj = null; + final Object obj; try(ObjectInputStream oin = new ObjectInputStream(in)) { obj = oin.readObject(); - return clazz.cast(obj); } catch (IOException | ClassNotFoundException e) { + throw new IllegalStateException("Failed to readObject for class " + clazz, e); + } + try { + return clazz.cast(obj); + } catch (ClassCastException e) { throw new IllegalStateException("Failed to cast to " + clazz + ", object=" + (obj instanceof Throwable? StringUtils.stringifyException((Throwable) obj): obj), e); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 8d62fdb1..b825429a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -82,7 +82,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie // Reuse the same grpc stream for all async calls. return proxy.getUnorderedAsyncStreamObservers().onNext(request); } catch (Exception e) { - LOG.error(clientId + ": XXX Failed " + request, e); + LOG.error(clientId + ": Failed " + request, e); return JavaUtils.completeExceptionally(e); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 068c7e5c..7265f3a8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -419,7 +419,7 @@ class LeaderStateImpl implements LeaderState { CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) { LOG.debug("{}: addWatchRequest {}", this, request); return watchRequests.add(request) - .thenApply(v -> server.newSuccessReply(request)) + .thenApply(logIndex -> server.newSuccessReply(request, logIndex)) .exceptionally(e -> exception2RaftClientReply(request, e)); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java index 4d3b985d..3b95d4bf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.server.impl; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto; @@ -45,14 +44,14 @@ class WatchRequests { static class PendingWatch { private final WatchRequestTypeProto watch; private final Timestamp creationTime; - private final Supplier<CompletableFuture<Void>> future = JavaUtils.memoize(CompletableFuture::new); + private final Supplier<CompletableFuture<Long>> future = JavaUtils.memoize(CompletableFuture::new); PendingWatch(WatchRequestTypeProto watch, Timestamp creationTime) { this.watch = watch; this.creationTime = creationTime; } - CompletableFuture<Void> getFuture() { + CompletableFuture<Long> getFuture() { return future.get(); } @@ -87,16 +86,17 @@ class WatchRequests { return index; } - CompletableFuture<Void> add(RaftClientRequest request) { + CompletableFuture<Long> add(RaftClientRequest request) { final long currentTime = Timestamp.currentTimeNanos(); final long roundUp = watchTimeoutDenominationNanos.roundUpNanos(currentTime); final PendingWatch pending = new PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp)); final PendingWatch computed; synchronized (this) { - if (pending.getIndex() <= getIndex()) { // compare again synchronized + final long queueIndex = getIndex(); + if (pending.getIndex() <= queueIndex) { // compare again synchronized // watch condition already satisfied - return null; + return CompletableFuture.completedFuture(queueIndex); } computed = q.compute(pending, (key, old) -> old != null? old: resource.tryAcquire()? pending: null); } @@ -136,7 +136,6 @@ class WatchRequests { return true; } - @SuppressFBWarnings("NP_NULL_PARAM_DEREF") synchronized void updateIndex(final long newIndex) { if (newIndex <= getIndex()) { // compare again synchronized return; @@ -152,7 +151,7 @@ class WatchRequests { final boolean removed = removeExisting(first); Preconditions.assertTrue(removed); LOG.debug("{}: complete {}", name, first); - first.getFuture().complete(null); + first.getFuture().complete(newIndex); } } @@ -187,17 +186,15 @@ class WatchRequests { Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r, elementLimit))); } - @SuppressFBWarnings("NP_NULL_PARAM_DEREF") - CompletableFuture<Void> add(RaftClientRequest request) { + CompletableFuture<Long> add(RaftClientRequest request) { final WatchRequestTypeProto watch = request.getType().getWatch(); final WatchQueue queue = queues.get(watch.getReplication()); - if (watch.getIndex() > queue.getIndex()) { // compare without synchronization - final CompletableFuture<Void> future = queue.add(request); - if (future != null) { - return future; - } + final long queueIndex = queue.getIndex(); + if (watch.getIndex() <= queueIndex) { // compare without synchronization + // watch condition already satisfied + return CompletableFuture.completedFuture(queueIndex); } - return CompletableFuture.completedFuture(null); + return queue.add(request); } void update(ReplicationLevel replication, final long newIndex) { diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index 9ac50129..266716b0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -208,6 +208,9 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> throw e; } log.info("{}-Watch({}) returns {}", name, logIndex, reply); + + Assert.assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.getLogIndex() >= logIndex); return reply; } } @@ -272,10 +275,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); Assert.assertEquals(logIndex, watchReplies.logIndex); final RaftClientReply watchMajorityReply = watchReplies.getMajority(); - Assert.assertTrue(watchMajorityReply.isSuccess()); final RaftClientReply watchMajorityCommittedReply = watchReplies.getMajorityCommitted(); - Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); final String message = "logIndex=" + logIndex + ", " + ProtoUtils.toString(commitInfos); @@ -299,10 +300,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final long logIndex = watchReplies.logIndex; LOG.info("checkAll {}: logIndex={}", i, logIndex); final RaftClientReply watchAllReply = watchReplies.getAll(); - Assert.assertTrue(watchAllReply.isSuccess()); final RaftClientReply watchAllCommittedReply = watchReplies.getAllCommitted(); - Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); final String message = "logIndex=" + logIndex + ", " + ProtoUtils.toString(commitInfos);
