This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4692f3de1 RATIS-1936. Make the raft request configurable in Streaming
(#968)
4692f3de1 is described below
commit 4692f3de108995ab5f5325a1bceb63bd025ac1fb
Author: hao guo <[email protected]>
AuthorDate: Tue Nov 21 03:24:46 2023 +0800
RATIS-1936. Make the raft request configurable in Streaming (#968)
---
.../org/apache/ratis/client/impl/DataStreamClientImpl.java | 14 +++++++++++++-
.../src/main/java/org/apache/ratis/RaftConfigKeys.java | 13 +++++++++++++
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 353f532c6..d184eb2ab 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
@@ -46,6 +47,8 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -61,6 +64,8 @@ import java.util.concurrent.CompletableFuture;
* allows client to create streams and send asynchronously.
*/
public class DataStreamClientImpl implements DataStreamClient {
+ public static final Logger LOG =
LoggerFactory.getLogger(DataStreamClientImpl.class);
+
private final RaftClient client;
private final ClientId clientId;
private final RaftGroupId groupId;
@@ -68,9 +73,11 @@ public class DataStreamClientImpl implements
DataStreamClient {
private final RaftPeer dataStreamServer;
private final DataStreamClientRpc dataStreamClientRpc;
private final OrderedStreamAsync orderedStreamAsync;
+ private final boolean skipSendForward;
DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer
dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+ this.skipSendForward =
RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
this.client = null;
this.clientId = clientId;
this.groupId = groupId;
@@ -81,6 +88,7 @@ public class DataStreamClientImpl implements DataStreamClient
{
DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+ this.skipSendForward =
RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
this.client = client;
this.clientId = client.getId();
this.groupId = client.getGroupId();
@@ -146,7 +154,11 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
final CompletableFuture<DataStreamReply> f =
combineHeader(send(Type.STREAM_DATA, data, length, options));
if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
- closeFuture = client != null? f.thenCompose(this::sendForward): f;
+ if (skipSendForward) {
+ closeFuture = f;
+ } else {
+ closeFuture = client != null? f.thenCompose(this::sendForward): f;
+ }
closeFuture.thenApply(ClientProtoUtils::getRaftClientReply)
.whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
index 7c14a479f..a3f40ce91 100644
--- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
+++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
@@ -18,8 +18,10 @@
package org.apache.ratis;
import static org.apache.ratis.conf.ConfUtils.get;
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
import static org.apache.ratis.conf.ConfUtils.printAll;
import static org.apache.ratis.conf.ConfUtils.set;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
@@ -61,6 +63,17 @@ public interface RaftConfigKeys {
static void setType(RaftProperties properties, SupportedDataStreamType
type) {
set(properties::set, TYPE_KEY, type.name());
}
+
+ String SKIP_SEND_FORWARD_KEY = PREFIX + ".skip.send-forward";
+ boolean SKIP_SEND_FORWARD_DEFAULT = false;
+
+ static boolean skipSendForward(RaftProperties properties, Consumer<String>
logger) {
+ return getBoolean(properties::getBoolean, SKIP_SEND_FORWARD_KEY,
SKIP_SEND_FORWARD_DEFAULT, logger);
+ }
+
+ static void setSkipSendForward(RaftProperties properties, boolean
skipSendForward) {
+ setBoolean(properties::setBoolean, SKIP_SEND_FORWARD_KEY,
skipSendForward);
+ }
}
static void main(String[] args) {