This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c1032750 RATIS-1590. Include the new log index in Watch reply. (#657)
c1032750 is described below
commit c1032750ae2d28b7504a2a9f6b917e290927b576
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jun 10 03:00:49 2022 -0700
RATIS-1590. Include the new log index in Watch reply. (#657)
---
.../java/org/apache/ratis/client/api/AsyncApi.java | 3 +++
.../org/apache/ratis/client/api/BlockingApi.java | 3 +++
.../main/java/org/apache/ratis/util/IOUtils.java | 17 +++++++++++--
.../apache/ratis/grpc/client/GrpcClientRpc.java | 2 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 2 +-
.../apache/ratis/server/impl/WatchRequests.java | 29 ++++++++++------------
.../java/org/apache/ratis/WatchRequestTests.java | 7 +++---
7 files changed, 39 insertions(+), 24 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index fd000ad2..1e8da9bd 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -66,6 +66,9 @@ public interface AsyncApi {
* @param index The log index to be watched.
* @param replication The replication level required.
* @return a future of the reply.
+ * When {@link RaftClientReply#isSuccess()} == true,
+ * the reply index (i.e. {@link RaftClientReply#getLogIndex()}) is
the log index satisfying the request,
+ * where reply index >= watch index.
*/
CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel
replication);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index e1679b97..84f9740e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -66,6 +66,9 @@ public interface BlockingApi {
* @param index The log index to be watched.
* @param replication The replication level required.
* @return the reply.
+ * When {@link RaftClientReply#isSuccess()} == true,
+ * the reply index (i.e. {@link RaftClientReply#getLogIndex()}) is
the log index satisfying the request,
+ * where reply index >= watch index.
*/
RaftClientReply watch(long index, ReplicationLevel replication) throws
IOException;
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index bd50a87a..0153ac49 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -214,12 +214,25 @@ public interface IOUtils {
return readObject(new ByteArrayInputStream(bytes), clazz);
}
+ /**
+ * Read an object from the given input stream.
+ *
+ * @param in input stream to read from.
+ * @param clazz the class of the object.
+ * @return the object read from the stream.
+ *
+ * @param <T> The class type.
+ */
static <T> T readObject(InputStream in, Class<T> clazz) {
- Object obj = null;
+ final Object obj;
try(ObjectInputStream oin = new ObjectInputStream(in)) {
obj = oin.readObject();
- return clazz.cast(obj);
} catch (IOException | ClassNotFoundException e) {
+ throw new IllegalStateException("Failed to readObject for class " +
clazz, e);
+ }
+ try {
+ return clazz.cast(obj);
+ } catch (ClassCastException e) {
throw new IllegalStateException("Failed to cast to " + clazz + ",
object="
+ (obj instanceof Throwable?
StringUtils.stringifyException((Throwable) obj): obj), e);
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 8d62fdb1..b825429a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -82,7 +82,7 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
// Reuse the same grpc stream for all async calls.
return proxy.getUnorderedAsyncStreamObservers().onNext(request);
} catch (Exception e) {
- LOG.error(clientId + ": XXX Failed " + request, e);
+ LOG.error(clientId + ": Failed " + request, e);
return JavaUtils.completeExceptionally(e);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 068c7e5c..7265f3a8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -419,7 +419,7 @@ class LeaderStateImpl implements LeaderState {
CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest
request) {
LOG.debug("{}: addWatchRequest {}", this, request);
return watchRequests.add(request)
- .thenApply(v -> server.newSuccessReply(request))
+ .thenApply(logIndex -> server.newSuccessReply(request, logIndex))
.exceptionally(e -> exception2RaftClientReply(request, e));
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 4d3b985d..3b95d4bf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.server.impl;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
@@ -45,14 +44,14 @@ class WatchRequests {
static class PendingWatch {
private final WatchRequestTypeProto watch;
private final Timestamp creationTime;
- private final Supplier<CompletableFuture<Void>> future =
JavaUtils.memoize(CompletableFuture::new);
+ private final Supplier<CompletableFuture<Long>> future =
JavaUtils.memoize(CompletableFuture::new);
PendingWatch(WatchRequestTypeProto watch, Timestamp creationTime) {
this.watch = watch;
this.creationTime = creationTime;
}
- CompletableFuture<Void> getFuture() {
+ CompletableFuture<Long> getFuture() {
return future.get();
}
@@ -87,16 +86,17 @@ class WatchRequests {
return index;
}
- CompletableFuture<Void> add(RaftClientRequest request) {
+ CompletableFuture<Long> add(RaftClientRequest request) {
final long currentTime = Timestamp.currentTimeNanos();
final long roundUp =
watchTimeoutDenominationNanos.roundUpNanos(currentTime);
final PendingWatch pending = new
PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp));
final PendingWatch computed;
synchronized (this) {
- if (pending.getIndex() <= getIndex()) { // compare again synchronized
+ final long queueIndex = getIndex();
+ if (pending.getIndex() <= queueIndex) { // compare again synchronized
// watch condition already satisfied
- return null;
+ return CompletableFuture.completedFuture(queueIndex);
}
computed = q.compute(pending, (key, old) -> old != null? old:
resource.tryAcquire()? pending: null);
}
@@ -136,7 +136,6 @@ class WatchRequests {
return true;
}
- @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
synchronized void updateIndex(final long newIndex) {
if (newIndex <= getIndex()) { // compare again synchronized
return;
@@ -152,7 +151,7 @@ class WatchRequests {
final boolean removed = removeExisting(first);
Preconditions.assertTrue(removed);
LOG.debug("{}: complete {}", name, first);
- first.getFuture().complete(null);
+ first.getFuture().complete(newIndex);
}
}
@@ -187,17 +186,15 @@ class WatchRequests {
Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new
WatchQueue(r, elementLimit)));
}
- @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
- CompletableFuture<Void> add(RaftClientRequest request) {
+ CompletableFuture<Long> add(RaftClientRequest request) {
final WatchRequestTypeProto watch = request.getType().getWatch();
final WatchQueue queue = queues.get(watch.getReplication());
- if (watch.getIndex() > queue.getIndex()) { // compare without
synchronization
- final CompletableFuture<Void> future = queue.add(request);
- if (future != null) {
- return future;
- }
+ final long queueIndex = queue.getIndex();
+ if (watch.getIndex() <= queueIndex) { // compare without synchronization
+ // watch condition already satisfied
+ return CompletableFuture.completedFuture(queueIndex);
}
- return CompletableFuture.completedFuture(null);
+ return queue.add(request);
}
void update(ReplicationLevel replication, final long newIndex) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 9ac50129..266716b0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -208,6 +208,9 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
throw e;
}
log.info("{}-Watch({}) returns {}", name, logIndex, reply);
+
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertTrue(reply.getLogIndex() >= logIndex);
return reply;
}
}
@@ -272,10 +275,8 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND,
TimeUnit.SECONDS);
Assert.assertEquals(logIndex, watchReplies.logIndex);
final RaftClientReply watchMajorityReply = watchReplies.getMajority();
- Assert.assertTrue(watchMajorityReply.isSuccess());
final RaftClientReply watchMajorityCommittedReply =
watchReplies.getMajorityCommitted();
- Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
{ // check commit infos
final Collection<CommitInfoProto> commitInfos =
watchMajorityCommittedReply.getCommitInfos();
final String message = "logIndex=" + logIndex + ", " +
ProtoUtils.toString(commitInfos);
@@ -299,10 +300,8 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
final long logIndex = watchReplies.logIndex;
LOG.info("checkAll {}: logIndex={}", i, logIndex);
final RaftClientReply watchAllReply = watchReplies.getAll();
- Assert.assertTrue(watchAllReply.isSuccess());
final RaftClientReply watchAllCommittedReply =
watchReplies.getAllCommitted();
- Assert.assertTrue(watchAllCommittedReply.isSuccess());
{ // check commit infos
final Collection<CommitInfoProto> commitInfos =
watchAllCommittedReply.getCommitInfos();
final String message = "logIndex=" + logIndex + ", " +
ProtoUtils.toString(commitInfos);