This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 4c68f568c92d2f9a0c907cb4b5bcac2163bd3e0d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jun 10 03:00:49 2022 -0700

    RATIS-1590. Include the new log index in Watch reply. (#657)
    
    (cherry picked from commit c1032750ae2d28b7504a2a9f6b917e290927b576)
---
 .../java/org/apache/ratis/client/api/AsyncApi.java |  3 +++
 .../org/apache/ratis/client/api/BlockingApi.java   |  3 +++
 .../main/java/org/apache/ratis/util/IOUtils.java   | 17 +++++++++++--
 .../apache/ratis/grpc/client/GrpcClientRpc.java    |  2 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../apache/ratis/server/impl/WatchRequests.java    | 29 ++++++++++------------
 .../java/org/apache/ratis/WatchRequestTests.java   |  7 +++---
 7 files changed, 39 insertions(+), 24 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index fd000ad2..1e8da9bd 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -66,6 +66,9 @@ public interface AsyncApi {
    * @param index The log index to be watched.
    * @param replication The replication level required.
    * @return a future of the reply.
+   *         When {@link RaftClientReply#isSuccess()} == true,
+   *         the reply index (i.e. {@link RaftClientReply#getLogIndex()}) is 
the log index satisfying the request,
+   *         where reply index >= watch index.
    */
   CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel 
replication);
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index e1679b97..84f9740e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -66,6 +66,9 @@ public interface BlockingApi {
    * @param index The log index to be watched.
    * @param replication The replication level required.
    * @return the reply.
+   *         When {@link RaftClientReply#isSuccess()} == true,
+   *         the reply index (i.e. {@link RaftClientReply#getLogIndex()}) is 
the log index satisfying the request,
+   *         where reply index >= watch index.
    */
   RaftClientReply watch(long index, ReplicationLevel replication) throws 
IOException;
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index bd50a87a..0153ac49 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -214,12 +214,25 @@ public interface IOUtils {
     return readObject(new ByteArrayInputStream(bytes), clazz);
   }
 
+  /**
+   * Read an object from the given input stream.
+   *
+   * @param in input stream to read from.
+   * @param clazz the class of the object.
+   * @return the object read from the stream.
+   *
+   * @param <T> The class type.
+   */
   static <T> T readObject(InputStream in, Class<T> clazz) {
-    Object obj = null;
+    final Object obj;
     try(ObjectInputStream oin = new ObjectInputStream(in)) {
       obj = oin.readObject();
-      return clazz.cast(obj);
     } catch (IOException | ClassNotFoundException e) {
+      throw new IllegalStateException("Failed to readObject for class " + 
clazz, e);
+    }
+    try {
+      return clazz.cast(obj);
+    } catch (ClassCastException e) {
       throw new IllegalStateException("Failed to cast to " + clazz + ", 
object="
               + (obj instanceof Throwable? 
StringUtils.stringifyException((Throwable) obj): obj), e);
     }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 8d62fdb1..b825429a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -82,7 +82,7 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClie
       // Reuse the same grpc stream for all async calls.
       return proxy.getUnorderedAsyncStreamObservers().onNext(request);
     } catch (Exception e) {
-      LOG.error(clientId + ": XXX Failed " + request, e);
+      LOG.error(clientId + ": Failed " + request, e);
       return JavaUtils.completeExceptionally(e);
     }
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 068c7e5c..7265f3a8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -419,7 +419,7 @@ class LeaderStateImpl implements LeaderState {
   CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest 
request) {
     LOG.debug("{}: addWatchRequest {}", this, request);
     return watchRequests.add(request)
-        .thenApply(v -> server.newSuccessReply(request))
+        .thenApply(logIndex -> server.newSuccessReply(request, logIndex))
         .exceptionally(e -> exception2RaftClientReply(request, e));
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 4d3b985d..3b95d4bf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.impl;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
@@ -45,14 +44,14 @@ class WatchRequests {
   static class PendingWatch {
     private final WatchRequestTypeProto watch;
     private final Timestamp creationTime;
-    private final Supplier<CompletableFuture<Void>> future = 
JavaUtils.memoize(CompletableFuture::new);
+    private final Supplier<CompletableFuture<Long>> future = 
JavaUtils.memoize(CompletableFuture::new);
 
     PendingWatch(WatchRequestTypeProto watch, Timestamp creationTime) {
       this.watch = watch;
       this.creationTime = creationTime;
     }
 
-    CompletableFuture<Void> getFuture() {
+    CompletableFuture<Long> getFuture() {
       return future.get();
     }
 
@@ -87,16 +86,17 @@ class WatchRequests {
       return index;
     }
 
-    CompletableFuture<Void> add(RaftClientRequest request) {
+    CompletableFuture<Long> add(RaftClientRequest request) {
       final long currentTime = Timestamp.currentTimeNanos();
       final long roundUp = 
watchTimeoutDenominationNanos.roundUpNanos(currentTime);
       final PendingWatch pending = new 
PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp));
 
       final PendingWatch computed;
       synchronized (this) {
-        if (pending.getIndex() <= getIndex()) { // compare again synchronized
+        final long queueIndex = getIndex();
+        if (pending.getIndex() <= queueIndex) { // compare again synchronized
           // watch condition already satisfied
-          return null;
+          return CompletableFuture.completedFuture(queueIndex);
         }
         computed = q.compute(pending, (key, old) -> old != null? old: 
resource.tryAcquire()? pending: null);
       }
@@ -136,7 +136,6 @@ class WatchRequests {
       return true;
     }
 
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     synchronized void updateIndex(final long newIndex) {
       if (newIndex <= getIndex()) { // compare again synchronized
         return;
@@ -152,7 +151,7 @@ class WatchRequests {
         final boolean removed = removeExisting(first);
         Preconditions.assertTrue(removed);
         LOG.debug("{}: complete {}", name, first);
-        first.getFuture().complete(null);
+        first.getFuture().complete(newIndex);
       }
     }
 
@@ -187,17 +186,15 @@ class WatchRequests {
     Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new 
WatchQueue(r, elementLimit)));
   }
 
-  @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
-  CompletableFuture<Void> add(RaftClientRequest request) {
+  CompletableFuture<Long> add(RaftClientRequest request) {
     final WatchRequestTypeProto watch = request.getType().getWatch();
     final WatchQueue queue = queues.get(watch.getReplication());
-    if (watch.getIndex() > queue.getIndex()) { // compare without 
synchronization
-      final CompletableFuture<Void> future = queue.add(request);
-      if (future != null) {
-        return future;
-      }
+    final long queueIndex = queue.getIndex();
+    if (watch.getIndex() <= queueIndex) { // compare without synchronization
+      // watch condition already satisfied
+      return CompletableFuture.completedFuture(queueIndex);
     }
-    return CompletableFuture.completedFuture(null);
+    return queue.add(request);
   }
 
   void update(ReplicationLevel replication, final long newIndex) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 9ac50129..266716b0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -208,6 +208,9 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
         throw e;
       }
       log.info("{}-Watch({}) returns {}", name, logIndex, reply);
+
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertTrue(reply.getLogIndex() >= logIndex);
       return reply;
     }
   }
@@ -272,10 +275,8 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
       Assert.assertEquals(logIndex, watchReplies.logIndex);
       final RaftClientReply watchMajorityReply = watchReplies.getMajority();
-      Assert.assertTrue(watchMajorityReply.isSuccess());
 
       final RaftClientReply watchMajorityCommittedReply = 
watchReplies.getMajorityCommitted();
-      Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = 
watchMajorityCommittedReply.getCommitInfos();
         final String message = "logIndex=" + logIndex + ", " + 
ProtoUtils.toString(commitInfos);
@@ -299,10 +300,8 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       final long logIndex = watchReplies.logIndex;
       LOG.info("checkAll {}: logIndex={}", i, logIndex);
       final RaftClientReply watchAllReply = watchReplies.getAll();
-      Assert.assertTrue(watchAllReply.isSuccess());
 
       final RaftClientReply watchAllCommittedReply = 
watchReplies.getAllCommitted();
-      Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = 
watchAllCommittedReply.getCommitInfos();
         final String message = "logIndex=" + logIndex + ", " + 
ProtoUtils.toString(commitInfos);

Reply via email to