This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new df436c73f RATIS-2035. Refactor streaming code for Read. (#1046)
df436c73f is described below
commit df436c73f66f192a2fb9f8e43500196e4e3a7cec
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Mar 6 22:29:50 2024 -0800
RATIS-2035. Refactor streaming code for Read. (#1046)
---
.../org/apache/ratis/netty/server/ChannelMap.java | 48 +++++++++++++++++++
.../ratis/netty/server/DataStreamManagement.java | 47 +-----------------
.../org/apache/ratis/netty/server/StreamMap.java | 55 ++++++++++++++++++++++
.../apache/ratis/server/DataStreamServerRpc.java | 10 +---
.../org/apache/ratis/server/RaftServerRpc.java | 9 +---
.../{DataStreamServerRpc.java => ServerRpc.java} | 14 ++----
.../ratis/datastream/DataStreamBaseTest.java | 4 +-
7 files changed, 113 insertions(+), 74 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
new file mode 100644
index 000000000..7b0d76184
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Map: {@link ChannelId} -> {@link ClientInvocationId}s. */
+class ChannelMap {
+ private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>>
map = new ConcurrentHashMap<>();
+
+ void add(ChannelId channelId, ClientInvocationId clientInvocationId) {
+ map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>())
+ .put(clientInvocationId, clientInvocationId);
+ }
+
+ void remove(ChannelId channelId, ClientInvocationId clientInvocationId) {
+ Optional.ofNullable(map.get(channelId))
+ .ifPresent((ids) -> ids.remove(clientInvocationId));
+ }
+
+ Set<ClientInvocationId> remove(ChannelId channelId) {
+ return Optional.ofNullable(map.remove(channelId))
+ .map(Map::keySet)
+ .orElse(Collections.emptySet());
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 302aed998..a6e9b815e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -70,13 +70,10 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@@ -219,52 +216,10 @@ public class DataStreamManagement {
}
}
- static class StreamMap {
- private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new
ConcurrentHashMap<>();
-
- StreamInfo computeIfAbsent(ClientInvocationId key,
Function<ClientInvocationId, StreamInfo> function) {
- final StreamInfo info = map.computeIfAbsent(key, function);
- LOG.debug("computeIfAbsent({}) returns {}", key, info);
- return info;
- }
-
- StreamInfo get(ClientInvocationId key) {
- final StreamInfo info = map.get(key);
- LOG.debug("get({}) returns {}", key, info);
- return info;
- }
-
- StreamInfo remove(ClientInvocationId key) {
- final StreamInfo info = map.remove(key);
- LOG.debug("remove({}) returns {}", key, info);
- return info;
- }
- }
-
- public static class ChannelMap {
- private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>>
map = new ConcurrentHashMap<>();
-
- public void add(ChannelId channelId,
- ClientInvocationId clientInvocationId) {
- map.computeIfAbsent(channelId, (e) -> new
ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
- }
-
- public void remove(ChannelId channelId,
- ClientInvocationId clientInvocationId) {
- Optional.ofNullable(map.get(channelId)).ifPresent((ids) ->
ids.remove(clientInvocationId));
- }
-
- public Set<ClientInvocationId> remove(ChannelId channelId) {
- return Optional.ofNullable(map.remove(channelId))
- .map(Map::keySet)
- .orElse(Collections.emptySet());
- }
- }
-
private final RaftServer server;
private final String name;
- private final StreamMap streams = new StreamMap();
+ private final StreamMap<StreamInfo> streams = new StreamMap<>();
private final ChannelMap channels;
private final ExecutorService requestExecutor;
private final ExecutorService writeExecutor;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
new file mode 100644
index 000000000..073698cb8
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+/**
+ * Map: {@link ClientInvocationId} -> {@link STREAM}.
+ *
+ * @param <STREAM> the stream type.
+ */
+class StreamMap<STREAM> {
+ public static final Logger LOG = LoggerFactory.getLogger(StreamMap.class);
+
+ private final ConcurrentMap<ClientInvocationId, STREAM> map = new
ConcurrentHashMap<>();
+
+ STREAM computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId,
STREAM> function) {
+ final STREAM info = map.computeIfAbsent(key, function);
+ LOG.debug("computeIfAbsent({}) returns {}", key, info);
+ return info;
+ }
+
+ STREAM get(ClientInvocationId key) {
+ final STREAM info = map.get(key);
+ LOG.debug("get({}) returns {}", key, info);
+ return info;
+ }
+
+ STREAM remove(ClientInvocationId key) {
+ final STREAM info = map.remove(key);
+ LOG.debug("remove({}) returns {}", key, info);
+ return info;
+ }
+}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index 4e948c6f8..6316ef607 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -20,18 +20,10 @@ package org.apache.ratis.server;
import org.apache.ratis.protocol.RaftPeer;
import java.io.Closeable;
-import java.net.InetSocketAddress;
/**
* A server interface handling incoming streams
* Relays those streams to other servers after persisting
*/
-public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
- /**
- * start server
- */
- void start();
-
- /** @return the address where this RPC server is listening to. */
- InetSocketAddress getInetSocketAddress();
+public interface DataStreamServerRpc extends ServerRpc, RaftPeer.Add,
Closeable {
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index d81f9cc8b..76bd817f5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -26,20 +26,13 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.util.JavaUtils;
import java.io.Closeable;
-import java.io.IOException;
import java.net.InetSocketAddress;
/**
* An server-side interface for supporting different RPC implementations
* such as Netty, gRPC and Hadoop.
*/
-public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get,
RaftPeer.Add, Closeable {
- /** Start the RPC service. */
- void start() throws IOException;
-
- /** @return the address where this RPC server is listening */
- InetSocketAddress getInetSocketAddress();
-
+public interface RaftServerRpc extends RaftServerProtocol, ServerRpc,
RpcType.Get, RaftPeer.Add, Closeable {
/** @return the address where this RPC server is listening for client
requests */
default InetSocketAddress getClientServerAddress() {
return getInetSocketAddress();
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
similarity index 79%
copy from
ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
copy to ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
index 4e948c6f8..6ad5eacf1 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
@@ -17,20 +17,16 @@
*/
package org.apache.ratis.server;
-import org.apache.ratis.protocol.RaftPeer;
-
import java.io.Closeable;
+import java.io.IOException;
import java.net.InetSocketAddress;
/**
- * A server interface handling incoming streams
- * Relays those streams to other servers after persisting
+ * A general server interface.
*/
-public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
- /**
- * start server
- */
- void start();
+public interface ServerRpc extends Closeable {
+ /** Start the RPC service. */
+ void start() throws IOException;
/** @return the address where this RPC server is listening to. */
InetSocketAddress getInetSocketAddress();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 70e26af24..2ac01ac1f 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -67,7 +67,7 @@ abstract class DataStreamBaseTest extends BaseTest {
return raftServer;
}
- void start() {
+ void start() throws IOException {
dataStreamServer.getServerRpc().start();
}
@@ -90,7 +90,7 @@ abstract class DataStreamBaseTest extends BaseTest {
return servers.get(0);
}
- void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer>
raftServers) {
+ void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer>
raftServers) throws Exception {
raftGroup = RaftGroup.valueOf(groupId, peers);
this.peers = peers;
servers = new ArrayList<>(peers.size());