This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a475808 RATIS-1129. Move out the RPC related APIs from
DataStreamOutput. (#254)
a475808 is described below
commit a475808f41b5dcabde85341d3c065591fbde7d94
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Nov 5 07:48:04 2020 +0800
RATIS-1129. Move out the RPC related APIs from DataStreamOutput. (#254)
---
...aStreamOutput.java => DataStreamOutputRpc.java} | 18 +++++-------
.../apache/ratis/client/api/DataStreamOutput.java | 9 ------
.../apache/ratis/client/api/MessageStreamApi.java | 6 ++--
.../ratis/client/impl/DataStreamClientImpl.java | 8 +++---
.../ratis/netty/server/NettyServerStreamRpc.java | 32 ++++++++++++----------
.../ratis/datastream/DataStreamBaseTest.java | 10 ++-----
6 files changed, 33 insertions(+), 50 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
similarity index 77%
copy from
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
copy to
ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
index 429a422..19fb46b 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
@@ -15,25 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.client.api;
+package org.apache.ratis.client;
-import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.protocol.DataStreamReply;
-import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-/** An asynchronous output stream supporting zero buffer copying. */
-public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
- /** Send out the data in the buffer asynchronously */
- CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
-
- /** Create a transaction asynchronously once the stream data is replicated
to all servers */
- CompletableFuture<DataStreamReply> startTransactionAsync();
-
+/** An RPC interface which extends the user interface {@link
DataStreamOutput}. */
+public interface DataStreamOutputRpc extends DataStreamOutput {
/** Get the future of the header request. */
CompletableFuture<DataStreamReply> getHeaderFuture();
/** Peer close asynchronously. */
CompletableFuture<DataStreamReply> closeForwardAsync();
+
+ /** Create a transaction asynchronously once the stream data is replicated
to all servers */
+ CompletableFuture<DataStreamReply> startTransactionAsync();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 429a422..dd86569 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -27,13 +27,4 @@ import java.util.concurrent.CompletableFuture;
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
/** Send out the data in the buffer asynchronously */
CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
-
- /** Create a transaction asynchronously once the stream data is replicated
to all servers */
- CompletableFuture<DataStreamReply> startTransactionAsync();
-
- /** Get the future of the header request. */
- CompletableFuture<DataStreamReply> getHeaderFuture();
-
- /** Peer close asynchronously. */
- CompletableFuture<DataStreamReply> closeForwardAsync();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
index 27cfa7f..9a816c8 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
@@ -31,11 +31,11 @@ import java.util.concurrent.CompletableFuture;
* the leader creates a raft log entry for the request
* and then replicates the log entry to all the followers.
*
- * Note that this API is similar to {@link
org.apache.ratis.client.RaftClient#sendAsync(Message)}
+ * Note that this API is similar to {@link AsyncApi#send(Message)}
* except that {@link MessageStreamApi} divides a (large) message into
multiple (small) sub-messages in the stream
- * but {@link org.apache.ratis.client.RaftClient#sendAsync(Message)} sends the
entire message in a single RPC.
+ * but {@link AsyncApi#send(Message)} sends the entire message in a single RPC.
* For sending large messages,
- * {@link MessageStreamApi} is more efficient than {@link
org.apache.ratis.client.RaftClient#sendAsync(Message)}}.
+ * {@link MessageStreamApi} is more efficient than {@link
AsyncApi#send(Message)}}.
*
* Note also that this API is different from {@link DataStreamApi} in the
sense that
* this API streams messages only to the leader
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 7f3546e..2690091 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -21,7 +21,7 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientFactory;
import org.apache.ratis.client.DataStreamClientRpc;
-import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
@@ -61,7 +61,7 @@ public class DataStreamClientImpl implements DataStreamClient
{
this.orderedStreamAsync = new OrderedStreamAsync(clientId,
dataStreamClientRpc, properties);
}
- public class DataStreamOutputImpl implements DataStreamOutput {
+ public class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
@@ -122,12 +122,12 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
@Override
- public DataStreamOutput stream() {
+ public DataStreamOutputRpc stream() {
return stream(groupId);
}
@Override
- public DataStreamOutput stream(RaftGroupId gid) {
+ public DataStreamOutputRpc stream(RaftGroupId gid) {
return new DataStreamOutputImpl(gid);
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 3e7cce8..911af6c 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -19,7 +19,7 @@
package org.apache.ratis.netty.server;
import org.apache.ratis.client.DataStreamClient;
-import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
@@ -95,8 +95,8 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
peers.addAll(newPeers);
}
- List<DataStreamOutput> getDataStreamOutput(RaftGroupId groupId) throws
IOException {
- final List<DataStreamOutput> outs = new ArrayList<>();
+ List<DataStreamOutputRpc> getDataStreamOutput(RaftGroupId groupId) throws
IOException {
+ final List<DataStreamOutputRpc> outs = new ArrayList<>();
try {
getDataStreamOutput(outs, groupId);
} catch (IOException e) {
@@ -106,10 +106,10 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return outs;
}
- private void getDataStreamOutput(List<DataStreamOutput> outs, RaftGroupId
groupId) throws IOException {
+ private void getDataStreamOutput(List<DataStreamOutputRpc> outs,
RaftGroupId groupId) throws IOException {
for (RaftPeer peer : peers) {
try {
- outs.add(map.getProxy(peer.getId()).stream(groupId));
+ outs.add((DataStreamOutputRpc)
map.getProxy(peer.getId()).stream(groupId));
} catch (IOException e) {
throw new IOException(map.getName() + ": Failed to
getDataStreamOutput for " + peer, e);
}
@@ -124,11 +124,11 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
static class StreamInfo {
private final RaftClientRequest request;
private final CompletableFuture<DataStream> stream;
- private final List<DataStreamOutput> outs;
+ private final List<DataStreamOutputRpc> outs;
private final AtomicReference<CompletableFuture<?>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
- StreamInfo(RaftClientRequest request, CompletableFuture<DataStream>
stream, List<DataStreamOutput> outs) {
+ StreamInfo(RaftClientRequest request, CompletableFuture<DataStream>
stream, List<DataStreamOutputRpc> outs) {
this.request = request;
this.stream = stream;
this.outs = outs;
@@ -138,7 +138,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return stream;
}
- List<DataStreamOutput> getDataStreamOutputs() {
+ List<DataStreamOutputRpc> getDataStreamOutputs() {
return outs;
}
@@ -319,7 +319,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
} else if (request.getType() == Type.START_TRANSACTION){
sendReplyNotSuccess(request, ctx);
} else {
- LOG.error("{}: Unexpected type:{}", this, request.getType());
+ throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
}
}, executorService);
} catch (IOException e) {
@@ -331,7 +331,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private void forwardStartTransaction(
final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
final List<CompletableFuture<Boolean>> results = new ArrayList<>();
- for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
if (reply.isSuccess()) {
final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
@@ -364,7 +364,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
- for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
} else if (request.getType() == Type.STREAM_DATA) {
@@ -372,7 +372,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
final CompletableFuture<?> previous = info.getPrevious().get();
localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) ->
writeTo(buf, stream), executorService);
- for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
remoteWrites.add(previous.thenComposeAsync(v ->
out.writeAsync(request.slice().nioBuffer()), executorService));
}
} else if (request.getType() == Type.STREAM_CLOSE || request.getType() ==
Type.STREAM_CLOSE_FORWARD) {
@@ -389,16 +389,18 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}, executorService);
if (request.getType() == Type.STREAM_CLOSE) {
- for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
remoteWrites.add(previous.thenComposeAsync(v ->
out.closeForwardAsync(), executorService));
}
}
- } else {
+ } else if (request.getType() == Type.START_TRANSACTION) {
// peer server start transaction
info = streams.get(key);
final CompletableFuture<?> previous = info.getPrevious().get();
previous.thenApplyAsync(v -> startTransaction(streams.get(key), request,
ctx), executorService);
return;
+ } else {
+ throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
}
final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
@@ -413,7 +415,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
// TODO(runzhiwang): send start transaction to leader directly
startTransaction(info, request, ctx);
} else {
- LOG.error("{}: Unexpected type:{}", this, request.getType());
+ throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
}
return null;
}, executorService);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index d034350..a7e134f 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -150,17 +150,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
- class Server {
+ static class Server {
private final RaftPeer peer;
private final RaftServer raftServer;
private final DataStreamServerImpl dataStreamServer;
- Server(RaftPeer peer) {
- this.peer = peer;
- this.raftServer = newRaftServer(peer, properties);
- this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
- }
-
Server(RaftPeer peer, RaftServer raftServer) {
this.peer = peer;
this.raftServer = raftServer;
@@ -423,7 +417,7 @@ abstract class DataStreamBaseTest extends BaseTest {
RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
Assert.assertTrue(replyByteBuffer.isSuccess());
Assert.assertEquals(clientReply.getCallId(),
expectedClientReply.getCallId());
-
Assert.assertTrue(clientReply.getClientId().equals(expectedClientReply.getClientId()));
+ Assert.assertEquals(clientReply.getClientId(),
expectedClientReply.getClientId());
Assert.assertEquals(clientReply.getLogIndex(),
expectedClientReply.getLogIndex());
}
}