This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7172da0  RATIS-1137. Rename StreamRequestTypeProto to 
MessageStreamRequestTypeProto (#261)
7172da0 is described below

commit 7172da0b965e295da815c2ce32deebe06c6e16ea
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Nov 7 20:41:40 2020 +0800

    RATIS-1137. Rename StreamRequestTypeProto to MessageStreamRequestTypeProto 
(#261)
    
    * RATIS-1137. Rename StreamRequestTypeProto to 
MessageStreamRequestTypeProto.
    
    * More renames.
---
 .../apache/ratis/client/impl/ClientProtoUtils.java |  8 +++----
 .../ratis/client/impl/MessageStreamImpl.java       |  8 +++----
 .../org/apache/ratis/client/impl/OrderedAsync.java |  2 +-
 .../apache/ratis/protocol/RaftClientRequest.java   | 27 +++++++++++-----------
 ratis-proto/src/main/proto/Raft.proto              |  4 ++--
 .../org/apache/ratis/server/impl/LeaderState.java  | 10 ++++----
 ...eamRequests.java => MessageStreamRequests.java} | 12 +++++-----
 .../apache/ratis/server/impl/RaftServerImpl.java   | 14 +++++------
 8 files changed, 43 insertions(+), 42 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 5b3cb7b..673d58f 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -87,8 +87,8 @@ public interface ClientProtoUtils {
     switch (p.getTypeCase()) {
       case WRITE:
         return RaftClientRequest.Type.valueOf(p.getWrite());
-      case STREAM:
-        return RaftClientRequest.Type.valueOf(p.getStream());
+      case MESSAGESTREAM:
+        return RaftClientRequest.Type.valueOf(p.getMessageStream());
       case READ:
         return RaftClientRequest.Type.valueOf(p.getRead());
       case STALEREAD:
@@ -127,8 +127,8 @@ public interface ClientProtoUtils {
       case WRITE:
         b.setWrite(type.getWrite());
         break;
-      case STREAM:
-        b.setStream(type.getStream());
+      case MESSAGESTREAM:
+        b.setMessageStream(type.getMessageStream());
         break;
       case READ:
         b.setRead(type.getRead());
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index 14779a1..d35bbd0 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -50,18 +50,18 @@ public final class MessageStreamImpl implements 
MessageStreamApi {
       this.id = id;
     }
 
-    private Type getStreamRequestType(boolean endOfRequest) {
-      return RaftClientRequest.streamRequestType(id, 
messageId.getAndIncrement(), endOfRequest);
+    private Type getMessageStreamRequestType(boolean endOfRequest) {
+      return RaftClientRequest.messageStreamRequestType(id, 
messageId.getAndIncrement(), endOfRequest);
     }
 
     @Override
     public CompletableFuture<RaftClientReply> sendAsync(Message message, 
boolean endOfRequest) {
-      return client.async().send(getStreamRequestType(endOfRequest), message, 
null);
+      return client.async().send(getMessageStreamRequestType(endOfRequest), 
message, null);
     }
 
     @Override
     public CompletableFuture<RaftClientReply> closeAsync() {
-      return client.async().send(getStreamRequestType(true), null, null);
+      return client.async().send(getMessageStreamRequestType(true), null, 
null);
     }
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index bd14e23..57c4a8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -152,7 +152,7 @@ public final class OrderedAsync {
   }
 
   CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message 
message, RaftPeerId server) {
-    if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.STREAM)) {
+    if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) {
       Objects.requireNonNull(message, "message == null");
     }
     try {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index caea2a2..9bc623f 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -40,8 +40,8 @@ public class RaftClientRequest extends RaftClientMessage {
     return WRITE_DEFAULT;
   }
 
-  public static Type streamRequestType(long streamId, long messageId, boolean 
endOfRequest) {
-    return new Type(StreamRequestTypeProto.newBuilder()
+  public static Type messageStreamRequestType(long streamId, long messageId, 
boolean endOfRequest) {
+    return new Type(MessageStreamRequestTypeProto.newBuilder()
         .setStreamId(streamId)
         .setMessageId(messageId)
         .setEndOfRequest(endOfRequest)
@@ -83,8 +83,9 @@ public class RaftClientRequest extends RaftClientMessage {
       return watchRequestType(watch.getIndex(), watch.getReplication());
     }
 
-    public static Type valueOf(StreamRequestTypeProto stream) {
-      return streamRequestType(stream.getStreamId(), stream.getMessageId(), 
stream.getEndOfRequest());
+    public static Type valueOf(MessageStreamRequestTypeProto messageStream) {
+      return messageStreamRequestType(
+          messageStream.getStreamId(), messageStream.getMessageId(), 
messageStream.getEndOfRequest());
     }
 
     /**
@@ -104,8 +105,8 @@ public class RaftClientRequest extends RaftClientMessage {
       this(WRITE, write);
     }
 
-    private Type(StreamRequestTypeProto stream) {
-      this(STREAM, stream);
+    private Type(MessageStreamRequestTypeProto messageStream) {
+      this(MESSAGESTREAM, messageStream);
     }
 
     private Type(ReadRequestTypeProto read) {
@@ -133,9 +134,9 @@ public class RaftClientRequest extends RaftClientMessage {
       return (WriteRequestTypeProto)proto;
     }
 
-    public StreamRequestTypeProto getStream() {
-      Preconditions.assertTrue(is(STREAM), () -> "proto = " + proto);
-      return (StreamRequestTypeProto)proto;
+    public MessageStreamRequestTypeProto getMessageStream() {
+      Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
+      return (MessageStreamRequestTypeProto)proto;
     }
 
     public ReadRequestTypeProto getRead() {
@@ -161,8 +162,8 @@ public class RaftClientRequest extends RaftClientMessage {
       return "Watch" + toString(w.getReplication()) + "(" + w.getIndex() + ")";
     }
 
-    public static String toString(StreamRequestTypeProto s) {
-      return "Stream" + s.getStreamId() + "-" + s.getMessageId() + 
(s.getEndOfRequest()? "-eor": "");
+    public static String toString(MessageStreamRequestTypeProto s) {
+      return "MessageStream" + s.getStreamId() + "-" + s.getMessageId() + 
(s.getEndOfRequest()? "-eor": "");
     }
 
     @Override
@@ -170,8 +171,8 @@ public class RaftClientRequest extends RaftClientMessage {
       switch (typeCase) {
         case WRITE:
           return "RW";
-        case STREAM:
-          return toString(getStream());
+        case MESSAGESTREAM:
+          return toString(getMessageStream());
         case READ:
           return "RO";
         case STALEREAD:
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index e167f1e..18e88f2 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -249,7 +249,7 @@ enum RaftPeerRole {
 message WriteRequestTypeProto {
 }
 
-message StreamRequestTypeProto {
+message MessageStreamRequestTypeProto {
   uint64 streamId = 1;  // the id of this stream
   uint64 messageId = 2; // the message id within a particular stream.
   bool endOfRequest = 3;// Is this the end-of-request?
@@ -277,7 +277,7 @@ message RaftClientRequestProto {
     ReadRequestTypeProto read = 4;
     StaleReadRequestTypeProto staleRead = 5;
     WatchRequestTypeProto watch = 6;
-    StreamRequestTypeProto stream = 7;
+    MessageStreamRequestTypeProto messageStream = 7;
   }
 }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 0791f83..fcf8996 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -225,7 +225,7 @@ public class LeaderState {
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
-  private final StreamRequests streamRequests;
+  private final MessageStreamRequests messageStreamRequests;
   private volatile boolean running = true;
 
   private final int stagingCatchupGap;
@@ -251,7 +251,7 @@ public class LeaderState {
     logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
     this.pendingRequests = new PendingRequests(server.getMemberId(), 
properties, raftServerMetrics);
     this.watchRequests = new WatchRequests(server.getMemberId(), properties);
-    this.streamRequests = new StreamRequests(server.getMemberId());
+    this.messageStreamRequests = new 
MessageStreamRequests(server.getMemberId());
 
     final RaftConfiguration conf = server.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
@@ -293,7 +293,7 @@ public class LeaderState {
     } catch (IOException e) {
       LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
     }
-    streamRequests.clear();
+    messageStreamRequests.clear();
     server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
@@ -357,13 +357,13 @@ public class LeaderState {
   }
 
   CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
-    return streamRequests.streamAsync(request)
+    return messageStreamRequests.streamAsync(request)
         .thenApply(dummy -> new RaftClientReply(request, 
server.getCommitInfos()))
         .exceptionally(e -> exception2RaftClientReply(request, e));
   }
 
   CompletableFuture<RaftClientRequest> 
streamEndOfRequestAsync(RaftClientRequest request) {
-    return streamRequests.streamEndOfRequestAsync(request)
+    return messageStreamRequests.streamEndOfRequestAsync(request)
         .thenApply(bytes -> RaftClientRequest.toWriteRequest(request, 
Message.valueOf(bytes)));
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StreamRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java
similarity index 91%
rename from 
ratis-server/src/main/java/org/apache/ratis/server/impl/StreamRequests.java
rename to 
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java
index 4bf137c..d3d670a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StreamRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.proto.RaftProtos.StreamRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.MessageStreamRequestTypeProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -33,8 +33,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-class StreamRequests {
-  public static final Logger LOG = 
LoggerFactory.getLogger(StreamRequests.class);
+class MessageStreamRequests {
+  public static final Logger LOG = 
LoggerFactory.getLogger(MessageStreamRequests.class);
 
   private static class Key {
     private final ClientId clientId;
@@ -112,12 +112,12 @@ class StreamRequests {
   private final String name;
   private final StreamMap streams = new StreamMap();
 
-  StreamRequests(Object name) {
+  MessageStreamRequests(Object name) {
     this.name = name + "-" + getClass().getSimpleName();
   }
 
   CompletableFuture<?> streamAsync(RaftClientRequest request) {
-    final StreamRequestTypeProto stream = request.getType().getStream();
+    final MessageStreamRequestTypeProto stream = 
request.getType().getMessageStream();
     Preconditions.assertTrue(!stream.getEndOfRequest());
     final Key key = new Key(request.getClientId(), stream.getStreamId());
     final PendingStream pending = streams.computeIfAbsent(key);
@@ -125,7 +125,7 @@ class StreamRequests {
   }
 
   CompletableFuture<ByteString> streamEndOfRequestAsync(RaftClientRequest 
request) {
-    final StreamRequestTypeProto stream = request.getType().getStream();
+    final MessageStreamRequestTypeProto stream = 
request.getType().getMessageStream();
     Preconditions.assertTrue(stream.getEndOfRequest());
     final Key key = new Key(request.getClientId(), stream.getStreamId());
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 92fda73..03f25aa 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -643,7 +643,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     CompletableFuture<RaftClientReply> replyFuture;
 
-    if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
+    if (request.is(TypeCase.STALEREAD)) {
       replyFuture = staleReadAsync(request);
     } else {
       // first check the server's leader state
@@ -654,8 +654,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
       // let the state machine handle read-only request from client
       RaftClientRequest.Type type = request.getType();
-      if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
-        if (type.getStream().getEndOfRequest()) {
+      if (type.is(TypeCase.MESSAGESTREAM)) {
+        if (type.getMessageStream().getEndOfRequest()) {
           final CompletableFuture<RaftClientRequest> f = 
streamEndOfRequestAsync(request);
           if (f.isCompletedExceptionally()) {
             return f.thenApply(r -> null);
@@ -665,13 +665,13 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         }
       }
 
-      if (type.is(RaftClientRequestProto.TypeCase.READ)) {
+      if (type.is(TypeCase.READ)) {
         // TODO: We might not be the leader anymore by the time this completes.
         // See the RAFT paper section 8 (last part)
         replyFuture = 
processQueryFuture(stateMachine.query(request.getMessage()), request);
-      } else if (type.is(RaftClientRequestProto.TypeCase.WATCH)) {
+      } else if (type.is(TypeCase.WATCH)) {
         replyFuture = watchAsync(request);
-      } else if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
+      } else if (type.is(TypeCase.MESSAGESTREAM)) {
         replyFuture = streamAsync(request);
       } else {
         // query the retry cache
@@ -721,7 +721,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       raftServerMetrics.onFailedClientWrite();
     } else if (type.is(TypeCase.READ)) {
       raftServerMetrics.onFailedClientRead();
-    } else if (type.is(TypeCase.STREAM)) {
+    } else if (type.is(TypeCase.MESSAGESTREAM)) {
       raftServerMetrics.onFailedClientStream();
     }
   }

Reply via email to