This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7172da0 RATIS-1137. Rename StreamRequestTypeProto to
MessageStreamRequestTypeProto (#261)
7172da0 is described below
commit 7172da0b965e295da815c2ce32deebe06c6e16ea
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Nov 7 20:41:40 2020 +0800
RATIS-1137. Rename StreamRequestTypeProto to MessageStreamRequestTypeProto
(#261)
* RATIS-1137. Rename StreamRequestTypeProto to
MessageStreamRequestTypeProto.
* More renames.
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 8 +++----
.../ratis/client/impl/MessageStreamImpl.java | 8 +++----
.../org/apache/ratis/client/impl/OrderedAsync.java | 2 +-
.../apache/ratis/protocol/RaftClientRequest.java | 27 +++++++++++-----------
ratis-proto/src/main/proto/Raft.proto | 4 ++--
.../org/apache/ratis/server/impl/LeaderState.java | 10 ++++----
...eamRequests.java => MessageStreamRequests.java} | 12 +++++-----
.../apache/ratis/server/impl/RaftServerImpl.java | 14 +++++------
8 files changed, 43 insertions(+), 42 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 5b3cb7b..673d58f 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -87,8 +87,8 @@ public interface ClientProtoUtils {
switch (p.getTypeCase()) {
case WRITE:
return RaftClientRequest.Type.valueOf(p.getWrite());
- case STREAM:
- return RaftClientRequest.Type.valueOf(p.getStream());
+ case MESSAGESTREAM:
+ return RaftClientRequest.Type.valueOf(p.getMessageStream());
case READ:
return RaftClientRequest.Type.valueOf(p.getRead());
case STALEREAD:
@@ -127,8 +127,8 @@ public interface ClientProtoUtils {
case WRITE:
b.setWrite(type.getWrite());
break;
- case STREAM:
- b.setStream(type.getStream());
+ case MESSAGESTREAM:
+ b.setMessageStream(type.getMessageStream());
break;
case READ:
b.setRead(type.getRead());
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index 14779a1..d35bbd0 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -50,18 +50,18 @@ public final class MessageStreamImpl implements
MessageStreamApi {
this.id = id;
}
- private Type getStreamRequestType(boolean endOfRequest) {
- return RaftClientRequest.streamRequestType(id,
messageId.getAndIncrement(), endOfRequest);
+ private Type getMessageStreamRequestType(boolean endOfRequest) {
+ return RaftClientRequest.messageStreamRequestType(id,
messageId.getAndIncrement(), endOfRequest);
}
@Override
public CompletableFuture<RaftClientReply> sendAsync(Message message,
boolean endOfRequest) {
- return client.async().send(getStreamRequestType(endOfRequest), message,
null);
+ return client.async().send(getMessageStreamRequestType(endOfRequest),
message, null);
}
@Override
public CompletableFuture<RaftClientReply> closeAsync() {
- return client.async().send(getStreamRequestType(true), null, null);
+ return client.async().send(getMessageStreamRequestType(true), null,
null);
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index bd14e23..57c4a8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -152,7 +152,7 @@ public final class OrderedAsync {
}
CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message
message, RaftPeerId server) {
- if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.STREAM)) {
+ if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) {
Objects.requireNonNull(message, "message == null");
}
try {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index caea2a2..9bc623f 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -40,8 +40,8 @@ public class RaftClientRequest extends RaftClientMessage {
return WRITE_DEFAULT;
}
- public static Type streamRequestType(long streamId, long messageId, boolean
endOfRequest) {
- return new Type(StreamRequestTypeProto.newBuilder()
+ public static Type messageStreamRequestType(long streamId, long messageId,
boolean endOfRequest) {
+ return new Type(MessageStreamRequestTypeProto.newBuilder()
.setStreamId(streamId)
.setMessageId(messageId)
.setEndOfRequest(endOfRequest)
@@ -83,8 +83,9 @@ public class RaftClientRequest extends RaftClientMessage {
return watchRequestType(watch.getIndex(), watch.getReplication());
}
- public static Type valueOf(StreamRequestTypeProto stream) {
- return streamRequestType(stream.getStreamId(), stream.getMessageId(),
stream.getEndOfRequest());
+ public static Type valueOf(MessageStreamRequestTypeProto messageStream) {
+ return messageStreamRequestType(
+ messageStream.getStreamId(), messageStream.getMessageId(),
messageStream.getEndOfRequest());
}
/**
@@ -104,8 +105,8 @@ public class RaftClientRequest extends RaftClientMessage {
this(WRITE, write);
}
- private Type(StreamRequestTypeProto stream) {
- this(STREAM, stream);
+ private Type(MessageStreamRequestTypeProto messageStream) {
+ this(MESSAGESTREAM, messageStream);
}
private Type(ReadRequestTypeProto read) {
@@ -133,9 +134,9 @@ public class RaftClientRequest extends RaftClientMessage {
return (WriteRequestTypeProto)proto;
}
- public StreamRequestTypeProto getStream() {
- Preconditions.assertTrue(is(STREAM), () -> "proto = " + proto);
- return (StreamRequestTypeProto)proto;
+ public MessageStreamRequestTypeProto getMessageStream() {
+ Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
+ return (MessageStreamRequestTypeProto)proto;
}
public ReadRequestTypeProto getRead() {
@@ -161,8 +162,8 @@ public class RaftClientRequest extends RaftClientMessage {
return "Watch" + toString(w.getReplication()) + "(" + w.getIndex() + ")";
}
- public static String toString(StreamRequestTypeProto s) {
- return "Stream" + s.getStreamId() + "-" + s.getMessageId() +
(s.getEndOfRequest()? "-eor": "");
+ public static String toString(MessageStreamRequestTypeProto s) {
+ return "MessageStream" + s.getStreamId() + "-" + s.getMessageId() +
(s.getEndOfRequest()? "-eor": "");
}
@Override
@@ -170,8 +171,8 @@ public class RaftClientRequest extends RaftClientMessage {
switch (typeCase) {
case WRITE:
return "RW";
- case STREAM:
- return toString(getStream());
+ case MESSAGESTREAM:
+ return toString(getMessageStream());
case READ:
return "RO";
case STALEREAD:
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index e167f1e..18e88f2 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -249,7 +249,7 @@ enum RaftPeerRole {
message WriteRequestTypeProto {
}
-message StreamRequestTypeProto {
+message MessageStreamRequestTypeProto {
uint64 streamId = 1; // the id of this stream
uint64 messageId = 2; // the message id within a particular stream.
bool endOfRequest = 3;// Is this the end-of-request?
@@ -277,7 +277,7 @@ message RaftClientRequestProto {
ReadRequestTypeProto read = 4;
StaleReadRequestTypeProto staleRead = 5;
WatchRequestTypeProto watch = 6;
- StreamRequestTypeProto stream = 7;
+ MessageStreamRequestTypeProto messageStream = 7;
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 0791f83..fcf8996 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -225,7 +225,7 @@ public class LeaderState {
private final EventProcessor processor;
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
- private final StreamRequests streamRequests;
+ private final MessageStreamRequests messageStreamRequests;
private volatile boolean running = true;
private final int stagingCatchupGap;
@@ -251,7 +251,7 @@ public class LeaderState {
logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
this.pendingRequests = new PendingRequests(server.getMemberId(),
properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
- this.streamRequests = new StreamRequests(server.getMemberId());
+ this.messageStreamRequests = new
MessageStreamRequests(server.getMemberId());
final RaftConfiguration conf = server.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
@@ -293,7 +293,7 @@ public class LeaderState {
} catch (IOException e) {
LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
}
- streamRequests.clear();
+ messageStreamRequests.clear();
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
@@ -357,13 +357,13 @@ public class LeaderState {
}
CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
- return streamRequests.streamAsync(request)
+ return messageStreamRequests.streamAsync(request)
.thenApply(dummy -> new RaftClientReply(request,
server.getCommitInfos()))
.exceptionally(e -> exception2RaftClientReply(request, e));
}
CompletableFuture<RaftClientRequest>
streamEndOfRequestAsync(RaftClientRequest request) {
- return streamRequests.streamEndOfRequestAsync(request)
+ return messageStreamRequests.streamEndOfRequestAsync(request)
.thenApply(bytes -> RaftClientRequest.toWriteRequest(request,
Message.valueOf(bytes)));
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StreamRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java
similarity index 91%
rename from
ratis-server/src/main/java/org/apache/ratis/server/impl/StreamRequests.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java
index 4bf137c..d3d670a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StreamRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.proto.RaftProtos.StreamRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.MessageStreamRequestTypeProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -33,8 +33,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-class StreamRequests {
- public static final Logger LOG =
LoggerFactory.getLogger(StreamRequests.class);
+class MessageStreamRequests {
+ public static final Logger LOG =
LoggerFactory.getLogger(MessageStreamRequests.class);
private static class Key {
private final ClientId clientId;
@@ -112,12 +112,12 @@ class StreamRequests {
private final String name;
private final StreamMap streams = new StreamMap();
- StreamRequests(Object name) {
+ MessageStreamRequests(Object name) {
this.name = name + "-" + getClass().getSimpleName();
}
CompletableFuture<?> streamAsync(RaftClientRequest request) {
- final StreamRequestTypeProto stream = request.getType().getStream();
+ final MessageStreamRequestTypeProto stream =
request.getType().getMessageStream();
Preconditions.assertTrue(!stream.getEndOfRequest());
final Key key = new Key(request.getClientId(), stream.getStreamId());
final PendingStream pending = streams.computeIfAbsent(key);
@@ -125,7 +125,7 @@ class StreamRequests {
}
CompletableFuture<ByteString> streamEndOfRequestAsync(RaftClientRequest
request) {
- final StreamRequestTypeProto stream = request.getType().getStream();
+ final MessageStreamRequestTypeProto stream =
request.getType().getMessageStream();
Preconditions.assertTrue(stream.getEndOfRequest());
final Key key = new Key(request.getClientId(), stream.getStreamId());
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 92fda73..03f25aa 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -643,7 +643,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
CompletableFuture<RaftClientReply> replyFuture;
- if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
+ if (request.is(TypeCase.STALEREAD)) {
replyFuture = staleReadAsync(request);
} else {
// first check the server's leader state
@@ -654,8 +654,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// let the state machine handle read-only request from client
RaftClientRequest.Type type = request.getType();
- if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
- if (type.getStream().getEndOfRequest()) {
+ if (type.is(TypeCase.MESSAGESTREAM)) {
+ if (type.getMessageStream().getEndOfRequest()) {
final CompletableFuture<RaftClientRequest> f =
streamEndOfRequestAsync(request);
if (f.isCompletedExceptionally()) {
return f.thenApply(r -> null);
@@ -665,13 +665,13 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
}
- if (type.is(RaftClientRequestProto.TypeCase.READ)) {
+ if (type.is(TypeCase.READ)) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
replyFuture =
processQueryFuture(stateMachine.query(request.getMessage()), request);
- } else if (type.is(RaftClientRequestProto.TypeCase.WATCH)) {
+ } else if (type.is(TypeCase.WATCH)) {
replyFuture = watchAsync(request);
- } else if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
+ } else if (type.is(TypeCase.MESSAGESTREAM)) {
replyFuture = streamAsync(request);
} else {
// query the retry cache
@@ -721,7 +721,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
raftServerMetrics.onFailedClientWrite();
} else if (type.is(TypeCase.READ)) {
raftServerMetrics.onFailedClientRead();
- } else if (type.is(TypeCase.STREAM)) {
+ } else if (type.is(TypeCase.MESSAGESTREAM)) {
raftServerMetrics.onFailedClientStream();
}
}