This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 865a6ca RATIS-1117. Remove DataStreamClient.start() and
DataStreamClientRpc.startClient(). (#239)
865a6ca is described below
commit 865a6cab64313e6702d557c385ecd328abd43692
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 30 09:55:05 2020 +0800
RATIS-1117. Remove DataStreamClient.start() and
DataStreamClientRpc.startClient(). (#239)
---
.../org/apache/ratis/client/DataStreamClient.java | 7 +--
.../apache/ratis/client/DataStreamClientRpc.java | 12 +++--
.../client/DisabledDataStreamClientFactory.java | 5 +--
.../ratis/client/impl/DataStreamClientImpl.java | 14 ++----
.../ratis/netty/client/NettyClientStreamRpc.java | 52 +++++++++++-----------
.../ratis/netty/server/NettyServerStreamRpc.java | 4 +-
.../ratis/datastream/TestDataStreamBase.java | 1 -
7 files changed, 36 insertions(+), 59 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index 9122763..cc888c7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -28,19 +28,14 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
/**
- * A client interface that sends request to the streaming pipeline.
- * Associated with it will be a Netty Client.
+ * A user interface extending {@link DataStreamApi}.
*/
public interface DataStreamClient extends DataStreamApi, Closeable {
-
Logger LOG = LoggerFactory.getLogger(DataStreamClient.class);
/** Return the rpc client instance **/
DataStreamClientRpc getClientRpc();
- /** start the client */
- void start();
-
static Builder newBuilder() {
return new Builder();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
index 4db1c55..a9bcd9d 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
@@ -22,20 +22,18 @@ import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.util.JavaUtils;
+import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
/**
- * An api interface for to stream from client to server.
+ * A client interface for sending stream requests.
+ * The underlying implementation is pluggable, depending on the {@link
org.apache.ratis.datastream.DataStreamType}.
+ * The implementations of this interface define how the requests are
transported to the server.
*/
-public interface DataStreamClientRpc {
-
+public interface DataStreamClientRpc extends Closeable {
/** Async call to send a request. */
default CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest
request) {
throw new UnsupportedOperationException(getClass() + " does not support "
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}
-
- void startClient();
-
- void closeClient();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
index 1347e7b..7150228 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
@@ -35,10 +35,7 @@ public class DisabledDataStreamClientFactory implements
DataStreamClientFactory
public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server,
RaftProperties properties) {
return new DataStreamClientRpc() {
@Override
- public void startClient() {}
-
- @Override
- public void closeClient() {}
+ public void close() {}
};
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 9316c62..506e25d 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -31,9 +31,8 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -44,8 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* allows client to create streams and send asynchronously.
*/
public class DataStreamClientImpl implements DataStreamClient {
- public static final Logger LOG =
LoggerFactory.getLogger(DataStreamClientImpl.class);
-
// TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup
in constructor.
private final ClientId clientId = ClientId.randomId();
private final RaftGroupId groupId = RaftGroupId.randomId();
@@ -117,12 +114,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
@Override
- public void close(){
- dataStreamClientRpc.closeClient();
- }
-
- @Override
- public void start(){
- dataStreamClientRpc.startClient();
+ public void close() throws IOException {
+ dataStreamClientRpc.close();
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 5478880..e769e2c 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -33,28 +33,40 @@ import
org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyClientStreamRpc.class);
- private RaftPeer server;
+ private final RaftPeer server;
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
- private Channel channel;
- private Queue<CompletableFuture<DataStreamReply>> replies
- = new LinkedList<>();
+ private final Supplier<Channel> channel;
+ private final Queue<CompletableFuture<DataStreamReply>> replies = new
LinkedList<>();
public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
this.server = server;
+
+ final ChannelFuture f = new Bootstrap()
+ .group(workerGroup)
+ .channel(NioSocketChannel.class)
+ .handler(getInitializer())
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .connect(NetUtils.createSocketAddr(server.getAddress()));
+ this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
+ }
+
+ private Channel getChannel() {
+ return channel.get();
}
synchronized CompletableFuture<DataStreamReply> pollReply() {
@@ -64,7 +76,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private ChannelInboundHandler getClientHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws
InterruptedException {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
final DataStreamReply reply = (DataStreamReply) msg;
pollReply().complete(reply);
}
@@ -74,8 +86,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private ChannelInitializer<SocketChannel> getInitializer(){
return new ChannelInitializer<SocketChannel>(){
@Override
- public void initChannel(SocketChannel ch)
- throws Exception {
+ public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(newEncoder());
p.addLast(newDecoder());
@@ -110,31 +121,18 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public synchronized CompletableFuture<DataStreamReply>
streamAsync(DataStreamRequest request) {
CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
replies.offer(f);
- channel.writeAndFlush(request);
+ getChannel().writeAndFlush(request);
return f;
}
@Override
- public void startClient() {
- final InetSocketAddress address =
NetUtils.createSocketAddr(server.getAddress());
- try {
- channel = (new Bootstrap())
- .group(workerGroup)
- .channel(NioSocketChannel.class)
- .handler(getInitializer())
- .option(ChannelOption.SO_KEEPALIVE, true)
- .connect(address)
- .sync()
- .channel();
- System.out.println(channel);
- } catch (Exception e){
- LOG.info("Exception {}", e.getCause());
- }
+ public void close() {
+ getChannel().close().syncUninterruptibly();
+ workerGroup.shutdownGracefully();
}
@Override
- public void closeClient(){
- channel.close().syncUninterruptibly();
- workerGroup.shutdownGracefully();
+ public String toString() {
+ return getClass().getSimpleName() + "->" + server;
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 9797709..4743fcd 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -177,12 +177,10 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
- final DataStreamClient client = DataStreamClient.newBuilder()
+ return DataStreamClient.newBuilder()
.setRaftServer(peer)
.setProperties(properties)
.build();
- client.start();
- return client;
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
index d14b737..01b9fae 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
@@ -144,7 +144,6 @@ class TestDataStreamBase extends BaseTest {
protected void setupClient(){
client = new DataStreamClientImpl(peers.get(0), properties, null);
- client.start();
}
protected void shutdown() throws IOException {