This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4692f3de1 RATIS-1936. Make the raft request configurable in Streaming 
(#968)
4692f3de1 is described below

commit 4692f3de108995ab5f5325a1bceb63bd025ac1fb
Author: hao guo <[email protected]>
AuthorDate: Tue Nov 21 03:24:46 2023 +0800

    RATIS-1936. Make the raft request configurable in Streaming (#968)
---
 .../org/apache/ratis/client/impl/DataStreamClientImpl.java | 14 +++++++++++++-
 .../src/main/java/org/apache/ratis/RaftConfigKeys.java     | 13 +++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 353f532c6..d184eb2ab 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -17,6 +17,7 @@
 */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.AsyncRpcApi;
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamClientRpc;
@@ -46,6 +47,8 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SlidingWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -61,6 +64,8 @@ import java.util.concurrent.CompletableFuture;
  * allows client to create streams and send asynchronously.
  */
 public class DataStreamClientImpl implements DataStreamClient {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamClientImpl.class);
+
   private final RaftClient client;
   private final ClientId clientId;
   private final RaftGroupId groupId;
@@ -68,9 +73,11 @@ public class DataStreamClientImpl implements 
DataStreamClient {
   private final RaftPeer dataStreamServer;
   private final DataStreamClientRpc dataStreamClientRpc;
   private final OrderedStreamAsync orderedStreamAsync;
+  private final boolean skipSendForward;
 
   DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer 
dataStreamServer,
       DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+    this.skipSendForward = 
RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
     this.client = null;
     this.clientId = clientId;
     this.groupId = groupId;
@@ -81,6 +88,7 @@ public class DataStreamClientImpl implements DataStreamClient 
{
 
   DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
       DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+    this.skipSendForward = 
RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
     this.client = client;
     this.clientId = client.getId();
     this.groupId = client.getGroupId();
@@ -146,7 +154,11 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       }
       final CompletableFuture<DataStreamReply> f = 
combineHeader(send(Type.STREAM_DATA, data, length, options));
       if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
-        closeFuture = client != null? f.thenCompose(this::sendForward): f;
+        if (skipSendForward) {
+          closeFuture = f;
+        } else {
+          closeFuture = client != null? f.thenCompose(this::sendForward): f;
+        }
         closeFuture.thenApply(ClientProtoUtils::getRaftClientReply)
             .whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
       }
diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java 
b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
index 7c14a479f..a3f40ce91 100644
--- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
+++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
@@ -18,8 +18,10 @@
 package org.apache.ratis;
 
 import static org.apache.ratis.conf.ConfUtils.get;
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
 import static org.apache.ratis.conf.ConfUtils.printAll;
 import static org.apache.ratis.conf.ConfUtils.set;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
@@ -61,6 +63,17 @@ public interface RaftConfigKeys {
     static void setType(RaftProperties properties, SupportedDataStreamType 
type) {
       set(properties::set, TYPE_KEY, type.name());
     }
+
+    String SKIP_SEND_FORWARD_KEY = PREFIX + ".skip.send-forward";
+    boolean SKIP_SEND_FORWARD_DEFAULT = false;
+
+    static boolean skipSendForward(RaftProperties properties, Consumer<String> 
logger) {
+      return getBoolean(properties::getBoolean, SKIP_SEND_FORWARD_KEY, 
SKIP_SEND_FORWARD_DEFAULT, logger);
+    }
+
+    static void setSkipSendForward(RaftProperties properties, boolean 
skipSendForward) {
+      setBoolean(properties::setBoolean, SKIP_SEND_FORWARD_KEY, 
skipSendForward);
+    }
   }
 
   static void main(String[] args) {

Reply via email to