This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1af0980 RATIS-1030: Implement ratis streaming using netty - Client
code (#179). Contributed by Ansh Khanna
1af0980 is described below
commit 1af09806518c40a401948af5352a3192f183c8d3
Author: anshkhannasbu <[email protected]>
AuthorDate: Thu Aug 13 15:56:42 2020 -0400
RATIS-1030: Implement ratis streaming using netty - Client code (#179).
Contributed by Ansh Khanna
---
.../org/apache/ratis/client/DataStreamClient.java | 46 ++----
.../ratis/client/DataStreamClientFactory.java | 4 +-
.../apache/ratis/client/DataStreamClientRpc.java | 3 +
.../ratis/client/impl/DataStreamClientImpl.java | 94 +++++++++---
.../ratis/client/impl/OrderedStreamAsync.java | 160 +++++++++++++++++++++
.../apache/ratis/protocol/DataStreamMessage.java | 2 +-
...StreamMessage.java => DataStreamReplyImpl.java} | 40 +++++-
...amMessage.java => DataStreamRequestClient.java} | 38 ++++-
.../ratis/protocol/DataStreamRequestServer.java | 53 +++++++
.../apache/ratis/netty/NettyDataStreamFactory.java | 10 +-
.../ratis/netty/client/NettyClientStreamRpc.java | 116 +++++++++++++++
.../netty/decoders/DataStreamReplyDecoder.java | 53 +++++++
.../netty/decoders/DataStreamRequestDecoder.java | 59 ++++++++
.../netty/encoders/DataStreamReplyEncoder.java | 44 ++++++
.../netty/encoders/DataStreamRequestEncoder.java | 42 ++++++
.../org/apache/ratis/server/DataStreamServer.java | 18 +--
.../ratis/server/DataStreamServerFactory.java | 14 +-
.../apache/ratis/server/DataStreamServerRpc.java | 10 ++
18 files changed, 728 insertions(+), 78 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index 8e3f7ae..ae94aab 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -17,13 +17,11 @@
*/
package org.apache.ratis.client;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.api.DataStreamApi;
+import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.SupportedDataStreamType;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,42 +33,32 @@ public interface DataStreamClient {
Logger LOG = LoggerFactory.getLogger(DataStreamClient.class);
- /** Return Client id. */
- ClientId getId();
+ /** Return the rpc client instance **/
+ DataStreamClientRpc getClientRpc();
/** Return Streamer Api instance. */
- DataStreamApi getDataStreamApi();
+ DataStreamOutput stream();
- /**
- * send to server via streaming.
- * Return a completable future.
- */
+ /** add information of the raft peers to communicate with */
+ void addPeers(Iterable<RaftPeer> peers);
+
+ /** close the client */
+ void close();
/** To build {@link DataStreamClient} objects */
class Builder {
- private ClientId clientId;
- private DataStreamClientRpc dataStreamClientRpc;
+ private RaftPeer raftServer;
private RaftProperties properties;
private Parameters parameters;
private Builder() {}
public DataStreamClientImpl build(){
- if (clientId == null) {
- clientId = ClientId.randomId();
- }
- if (properties != null) {
- if (dataStreamClientRpc == null) {
- final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
- dataStreamClientRpc =
DataStreamClientFactory.cast(type.newFactory(parameters))
- .newDataStreamClientRpc(clientId, properties);
- }
- }
- return new DataStreamClientImpl(clientId, properties,
dataStreamClientRpc);
+ return new DataStreamClientImpl(raftServer, properties, parameters);
}
- public Builder setClientId(ClientId clientId) {
- this.clientId = clientId;
+ public Builder setRaftServer(RaftPeer peer) {
+ this.raftServer = peer;
return this;
}
@@ -79,15 +67,9 @@ public interface DataStreamClient {
return this;
}
- public Builder setDataStreamClientRpc(DataStreamClientRpc
dataStreamClientRpc){
- this.dataStreamClientRpc = dataStreamClientRpc;
- return this;
- }
-
public Builder setProperties(RaftProperties properties) {
this.properties = properties;
return this;
}
}
-
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
index f5b00e0..c77fb7a 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
@@ -20,7 +20,7 @@ package org.apache.ratis.client;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeer;
/**
* A factory to create streaming client.
@@ -36,5 +36,5 @@ public interface DataStreamClientFactory extends
DataStreamFactory {
+ "; stream type is " + dataStreamFactory.getDataStreamType());
}
- DataStreamClientRpc newDataStreamClientRpc(ClientId clientId, RaftProperties
properties);
+ DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties
properties);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
index 7021de9..4db1c55 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
@@ -35,4 +35,7 @@ public interface DataStreamClientRpc {
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}
+ void startClient();
+
+ void closeClient();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index fff816c..2afc42a 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -14,38 +14,100 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.ratis.client.impl;
-import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DataStreamClient;
-import org.apache.ratis.client.api.DataStreamApi;
+import org.apache.ratis.client.DataStreamClientFactory;
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Streaming client implementation
+ * allows client to create streams and send asynchronously.
+ */
public class DataStreamClientImpl implements DataStreamClient {
+ public static final Logger LOG =
LoggerFactory.getLogger(DataStreamClientImpl.class);
- private final ClientId clientId;
- private final RaftProperties properties;
- private final DataStreamClientRpc dataStreamClientRpc;
- private final DataStreamApi dataStreamApi = null; //TODO need an impl
+ private DataStreamClientRpc dataStreamClientRpc;
+ private OrderedStreamAsync orderedStreamAsync;
+ private RaftPeer raftServer;
+ private RaftProperties properties;
+ private Parameters parameters;
+ private long streamId = 0;
- public DataStreamClientImpl(ClientId clientId,
+ public DataStreamClientImpl(RaftPeer raftServer,
RaftProperties properties,
- DataStreamClientRpc dataStreamClientRpc){
- this.clientId = clientId;
+ Parameters parameters) {
+ this.raftServer = Objects.requireNonNull(raftServer,
+ "peer == null");
this.properties = properties;
- this.dataStreamClientRpc = dataStreamClientRpc;
+ this.parameters = parameters;
+
+ final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
+ this.dataStreamClientRpc =
DataStreamClientFactory.cast(type.newFactory(parameters))
+ .newDataStreamClientRpc(raftServer, properties);
+
+ this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
+ }
+
+ class DataStreamOutputImpl implements DataStreamOutput {
+ private long streamId = 0;
+ private long messageId = 0;
+
+ public DataStreamOutputImpl(long id){
+ this.streamId = id;
+ }
+
+ // send to the attached dataStreamClientRpc
+ @Override
+ public CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf) {
+ messageId++;
+ return orderedStreamAsync.sendRequest(streamId, messageId, buf);
+ }
+
+ // should wait for attached sliding window to terminate
+ @Override
+ public CompletableFuture<DataStreamReply> closeAsync() {
+ return null;
+ }
+ }
+
+ @Override
+ public DataStreamClientRpc getClientRpc() {
+ return dataStreamClientRpc;
}
@Override
- public ClientId getId() {
- return clientId;
+ public DataStreamOutput stream() {
+ streamId++;
+ return new DataStreamOutputImpl(streamId);
}
@Override
- public DataStreamApi getDataStreamApi(){
- return dataStreamApi;
+ public void addPeers(Iterable<RaftPeer> peers) {
+ return;
}
+ @Override
+ public void close(){
+ dataStreamClientRpc.closeClient();
+ }
+
+ public void start(){
+ dataStreamClientRpc.startClient();
+ }
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
new file mode 100644
index 0000000..34ba99e
--- /dev/null
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SlidingWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.function.LongFunction;
+
+public class OrderedStreamAsync {
+ public static final Logger LOG =
LoggerFactory.getLogger(OrderedStreamAsync.class);
+
+ public static class DataStreamWindowRequest implements DataStreamRequest,
+ SlidingWindow.ClientSideRequest<DataStreamReply> {
+ private final long streamId;
+ private final long messageId;
+ private final long seqNum;
+ private final ByteBuffer data;
+ private boolean isFirst = false;
+ private CompletableFuture<DataStreamReply> replyFuture = new
CompletableFuture<>();
+
+ public DataStreamRequestClient newDataStreamRequest(){
+ return new DataStreamRequestClient(streamId, messageId, data.slice());
+ }
+
+ @Override
+ public long getStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public long getDataOffset() {
+ return messageId;
+ }
+
+ @Override
+ public long getDataLength() {
+ return data.capacity();
+ }
+
+ @Override
+ public void setFirstRequest() {
+ isFirst = true;
+ }
+
+ @Override
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ @Override
+ public void setReply(DataStreamReply dataStreamReply) {
+ replyFuture.complete(dataStreamReply);
+ }
+
+ @Override
+ public boolean hasReply() {
+ return replyFuture.isDone();
+ }
+
+ @Override
+ public void fail(Throwable e) {
+ replyFuture.completeExceptionally(e);
+ }
+
+ public CompletableFuture<DataStreamReply> getReplyFuture(){
+ return replyFuture;
+ }
+
+ public DataStreamWindowRequest(long streamId, long messageId,
+ long seqNum, ByteBuffer data){
+ this.streamId = streamId;
+ this.messageId = messageId;
+ this.data = data.slice();
+ this.seqNum = seqNum;
+ }
+ }
+
+ private SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply>
slidingWindow;
+ private Semaphore requestSemaphore;
+ private DataStreamClientRpc dataStreamClientRpc;
+
+ public OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc,
+ RaftProperties properties){
+ this.dataStreamClientRpc = dataStreamClientRpc;
+ this.requestSemaphore = new
Semaphore(RaftClientConfigKeys.Async.outstandingRequestsMax(properties)*2);
+ this.slidingWindow = new SlidingWindow.Client<>("sliding");
+ }
+
+ private void resetSlidingWindow(RaftClientRequest request) {
+ slidingWindow.resetFirstSeqNum();
+ }
+
+ public CompletableFuture<DataStreamReply> sendRequest(long streamId,
+ long messageId,
+ ByteBuffer data){
+ try {
+ requestSemaphore.acquire();
+ } catch (InterruptedException e){
+ return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
+ "Interrupted when sending streamId=" + streamId + ", messageId= " +
messageId, e));
+ }
+ final LongFunction<DataStreamWindowRequest> constructor = seqNum -> new
DataStreamWindowRequest(streamId,
+
messageId, seqNum, data);
+ return slidingWindow.submitNewRequest(constructor,
this::sendRequestToNetwork).
+ getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
+ }
+
+ private void sendRequestToNetwork(DataStreamWindowRequest request){
+ CompletableFuture<DataStreamReply> f = request.getReplyFuture();
+ if(f.isDone()) {
+ return;
+ }
+ if(slidingWindow.isFirst(request.getSeqNum())){
+ request.setFirstRequest();
+ }
+ DataStreamRequestClient rpcRequest = request.newDataStreamRequest();
+ CompletableFuture<DataStreamReply> requestFuture =
dataStreamClientRpc.streamAsync(rpcRequest);
+ requestFuture.thenApply(reply -> {
+ slidingWindow.receiveReply(
+ request.getSeqNum(), reply, this::sendRequestToNetwork);
+ return reply;
+ }).thenAccept(reply -> {
+ if (f.isDone()) {
+ return;
+ }
+ f.complete(reply);
+ });
+ }
+
+
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
index eae8137..b05694c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
@@ -18,7 +18,7 @@
package org.apache.ratis.protocol;
-public interface DataStreamMessage extends RaftRpcMessage {
+public interface DataStreamMessage{
long getStreamId();
long getDataOffset();
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
similarity index 54%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
copy to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
index eae8137..fba60f0 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,10 +18,38 @@
package org.apache.ratis.protocol;
-public interface DataStreamMessage extends RaftRpcMessage {
- long getStreamId();
+import java.nio.ByteBuffer;
- long getDataOffset();
+public class DataStreamReplyImpl implements DataStreamReply {
+ private long streamId;
+ private long dataOffset;
+ private ByteBuffer response;
- long getDataLength();
-}
\ No newline at end of file
+ public DataStreamReplyImpl(long streamId,
+ long dataOffset,
+ ByteBuffer bf){
+ this.streamId = streamId;
+ this.dataOffset = dataOffset;
+ this.response = bf;
+ }
+
+ @Override
+ public ByteBuffer getResponse() {
+ return response;
+ }
+
+ @Override
+ public long getStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public long getDataOffset() {
+ return dataOffset;
+ }
+
+ @Override
+ public long getDataLength() {
+ return response.capacity();
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
similarity index 56%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
copy to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
index eae8137..a34809c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,10 +18,36 @@
package org.apache.ratis.protocol;
-public interface DataStreamMessage extends RaftRpcMessage {
- long getStreamId();
+import java.nio.ByteBuffer;
- long getDataOffset();
+public class DataStreamRequestClient implements DataStreamRequest{
+ private long streamId;
+ private long dataOffset;
+ private ByteBuffer buf;
- long getDataLength();
-}
\ No newline at end of file
+ public DataStreamRequestClient(long streamId, long dataOffset,
+ ByteBuffer buf){
+ this.streamId = streamId;
+ this.dataOffset = dataOffset;
+ this.buf = buf;
+ }
+
+ @Override
+ public long getStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public long getDataOffset() {
+ return dataOffset;
+ }
+
+ @Override
+ public long getDataLength() {
+ return buf.capacity();
+ }
+
+ public ByteBuffer getBuf() {
+ return buf;
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
new file mode 100644
index 0000000..8906700
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+
+public class DataStreamRequestServer implements DataStreamRequest {
+ private long streamId;
+ private long dataOffset;
+ private ByteBuf buf;
+
+ public DataStreamRequestServer(long streamId, long dataOffset,
+ ByteBuf buf){
+ this.streamId = streamId;
+ this.dataOffset = dataOffset;
+ this.buf = buf;
+ }
+
+ @Override
+ public long getStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public long getDataOffset() {
+ return dataOffset;
+ }
+
+ @Override
+ public long getDataLength() {
+ return buf.capacity();
+ }
+
+ public ByteBuf getBuf() {
+ return buf;
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 66c360a..2480188 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -22,8 +22,8 @@ import org.apache.ratis.client.DataStreamClientFactory;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.netty.client.NettyClientStreamRpc;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DataStreamServerFactory;
@@ -36,12 +36,12 @@ public class NettyDataStreamFactory implements
DataStreamServerFactory, DataStre
}
@Override
- public DataStreamClientRpc newDataStreamClientRpc(ClientId clientId,
RaftProperties properties) {
- return null;
+ public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server,
RaftProperties properties) {
+ return new NettyClientStreamRpc(server, properties);
}
@Override
- public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
+ public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server) {
return null;
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
new file mode 100644
index 0000000..c59a928
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.decoders.DataStreamReplyDecoder;
+import org.apache.ratis.netty.encoders.DataStreamRequestEncoder;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.ratis.thirdparty.io.netty.channel.*;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
+import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+public class NettyClientStreamRpc implements DataStreamClientRpc {
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientStreamRpc.class);
+
+ private RaftPeer server;
+ private RaftProperties raftProperties;
+ private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private Channel channel;
+ private Queue<CompletableFuture<DataStreamReply>> replies
+ = new LinkedList<>();
+
+ public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
+ this.server = server;
+ this.raftProperties = properties;
+ }
+
+ synchronized CompletableFuture<DataStreamReply> pollReply() {
+ return replies.poll();
+ }
+
+ private ChannelInboundHandler getClientHandler(){
+ return new ChannelInboundHandlerAdapter(){
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
InterruptedException {
+ final DataStreamReplyImpl reply = (DataStreamReplyImpl) msg;
+ CompletableFuture<DataStreamReply> f = pollReply();
+ f.complete(reply);
+ }
+ };
+ }
+
+ private ChannelInitializer<SocketChannel> getInitializer(){
+ return new ChannelInitializer<SocketChannel>(){
+ @Override
+ public void initChannel(SocketChannel ch)
+ throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new DataStreamRequestEncoder());
+ p.addLast(new DataStreamReplyDecoder());
+ p.addLast(getClientHandler());
+ }
+ };
+ }
+
+ @Override
+ public synchronized CompletableFuture<DataStreamReply>
streamAsync(DataStreamRequest request) {
+ CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
+ replies.offer(f);
+ channel.writeAndFlush(request);
+ return f;
+ }
+
+ @Override
+ public void startClient() {
+ final InetSocketAddress address =
NetUtils.createSocketAddr(server.getAddress());
+ try {
+ channel = (new Bootstrap())
+ .group(workerGroup)
+ .channel(NioSocketChannel.class)
+ .handler(getInitializer())
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .connect(address)
+ .sync()
+ .channel();
+ System.out.println(channel);
+ } catch (Exception e){
+ LOG.info("Exception {}", e.getCause());
+ }
+ }
+
+ @Override
+ public void closeClient(){
+ channel.close().syncUninterruptibly();
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
new file mode 100644
index 0000000..2417707
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.decoders;
+
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+public class DataStreamReplyDecoder extends ByteToMessageDecoder {
+
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext,
+ ByteBuf byteBuf, List<Object> list) throws Exception {
+
+ if(byteBuf.readableBytes() >= 24){
+ long streamId = byteBuf.readLong();
+ long dataOffset = byteBuf.readLong();
+ long dataLength = byteBuf.readLong();
+ if(byteBuf.readableBytes() >= dataLength){
+ DataStreamReplyImpl reply = new DataStreamReplyImpl(streamId,
+ dataOffset,
+ byteBuf.slice(byteBuf.readerIndex(), (int)
dataLength).nioBuffer());
+ byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
+ byteBuf.markReaderIndex();
+ list.add(reply);
+ } else {
+ byteBuf.resetReaderIndex();
+ return;
+ }
+ } else{
+ return;
+ }
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
new file mode 100644
index 0000000..b0aca36
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.decoders;
+
+import org.apache.ratis.protocol.DataStreamRequestServer;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+public class DataStreamRequestDecoder extends ByteToMessageDecoder {
+
+ public DataStreamRequestDecoder() {
+ this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext,
+ ByteBuf byteBuf,
+ List<Object> list) throws Exception {
+ if(byteBuf.readableBytes() >= 24){
+ long streamId = byteBuf.readLong();
+ long dataOffset = byteBuf.readLong();
+ long dataLength = byteBuf.readLong();
+ if(byteBuf.readableBytes() >= dataLength){
+ ByteBuf bf = byteBuf.slice(byteBuf.readerIndex(), (int)dataLength);
+ bf.retain();
+ DataStreamRequestServer req = new DataStreamRequestServer(streamId,
+ dataOffset,
+ bf);
+ byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
+ byteBuf.markReaderIndex();
+ list.add(req);
+ } else {
+ byteBuf.resetReaderIndex();
+ return;
+ }
+ } else{
+ return;
+ }
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
new file mode 100644
index 0000000..2d7957d
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.encoders;
+
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class DataStreamReplyEncoder extends
+ MessageToMessageEncoder<DataStreamReplyImpl> {
+
+ @Override
+ protected void encode(ChannelHandlerContext channelHandlerContext,
+ DataStreamReplyImpl dataStreamReply,
+ List<Object> list) throws Exception {
+
+ ByteBuffer bb = ByteBuffer.allocateDirect(24);
+ bb.putLong(dataStreamReply.getStreamId());
+ bb.putLong(dataStreamReply.getDataOffset());
+ bb.putLong(dataStreamReply.getDataLength());
+ bb.flip();
+ list.add(Unpooled.wrappedBuffer(bb, dataStreamReply.getResponse()));
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
new file mode 100644
index 0000000..84bf570
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.encoders;
+
+import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class DataStreamRequestEncoder
+ extends MessageToMessageEncoder<DataStreamRequestClient> {
+
+ @Override
+ protected void encode(ChannelHandlerContext channelHandlerContext,
+ DataStreamRequestClient requestData, List<Object>
list) throws Exception {
+ ByteBuffer bb = ByteBuffer.allocateDirect(24);
+ bb.putLong(requestData.getStreamId());
+ bb.putLong(requestData.getDataOffset());
+ bb.putLong(requestData.getDataLength());
+ bb.flip();
+ list.add(Unpooled.wrappedBuffer(bb, requestData.getBuf()));
+ }
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServer.java
similarity index 73%
rename from
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/DataStreamServer.java
index 03fd4b3..434aee6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServer.java
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.client.api;
+package org.apache.ratis.server;
/**
- * An interface for streaming data.
- * Associated with it's implementation will be a client.
+ * Interface for streaming server.
*/
-
-public interface DataStreamApi {
-
+public interface DataStreamServer {
/**
- * Create a new stream for a new streamToRatis invocation
- * allows multiple stream from a single client.
+ * Get network interface for server.
*/
- DataStreamOutput stream();
+ DataStreamServerRpc getServerRpc();
+ /**
+ * close server.
+ */
+ void close();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index f0c3bd9..f22e48f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -18,10 +18,22 @@
package org.apache.ratis.server;
import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.ServerFactory;
public interface DataStreamServerFactory extends DataStreamFactory {
+
+ static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
+ if (dataStreamFactory instanceof DataStreamFactory) {
+ return (DataStreamServerFactory)dataStreamFactory;
+ }
+ throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
+ + " to " + ServerFactory.class
+ + "; rpc type is " + dataStreamFactory.getDataStreamType());
+ }
+
/**
* Server implementation for streaming in Raft group
*/
- DataStreamServerRpc newDataStreamServerRpc(RaftServer server);
+ DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index c5a074b..f8a270e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -22,4 +22,14 @@ package org.apache.ratis.server;
* Relays those streams to other servers after persisting
*/
public interface DataStreamServerRpc {
+ /**
+ * start server
+ */
+ void startServer();
+
+ /**
+ * shutdown server
+ */
+ void closeServer();
+
}