This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new df436c73f RATIS-2035. Refactor streaming code for Read. (#1046)
df436c73f is described below

commit df436c73f66f192a2fb9f8e43500196e4e3a7cec
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Mar 6 22:29:50 2024 -0800

    RATIS-2035. Refactor streaming code for Read. (#1046)
---
 .../org/apache/ratis/netty/server/ChannelMap.java  | 48 +++++++++++++++++++
 .../ratis/netty/server/DataStreamManagement.java   | 47 +-----------------
 .../org/apache/ratis/netty/server/StreamMap.java   | 55 ++++++++++++++++++++++
 .../apache/ratis/server/DataStreamServerRpc.java   | 10 +---
 .../org/apache/ratis/server/RaftServerRpc.java     |  9 +---
 .../{DataStreamServerRpc.java => ServerRpc.java}   | 14 ++----
 .../ratis/datastream/DataStreamBaseTest.java       |  4 +-
 7 files changed, 113 insertions(+), 74 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
new file mode 100644
index 000000000..7b0d76184
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Map: {@link ChannelId} -> {@link ClientInvocationId}s. */
+class ChannelMap {
+  private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> 
map = new ConcurrentHashMap<>();
+
+  void add(ChannelId channelId, ClientInvocationId clientInvocationId) {
+    map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>())
+        .put(clientInvocationId, clientInvocationId);
+  }
+
+  void remove(ChannelId channelId, ClientInvocationId clientInvocationId) {
+    Optional.ofNullable(map.get(channelId))
+        .ifPresent((ids) -> ids.remove(clientInvocationId));
+  }
+
+  Set<ClientInvocationId> remove(ChannelId channelId) {
+    return Optional.ofNullable(map.remove(channelId))
+        .map(Map::keySet)
+        .orElse(Collections.emptySet());
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 302aed998..a6e9b815e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -70,13 +70,10 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
@@ -219,52 +216,10 @@ public class DataStreamManagement {
     }
   }
 
-  static class StreamMap {
-    private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new 
ConcurrentHashMap<>();
-
-    StreamInfo computeIfAbsent(ClientInvocationId key, 
Function<ClientInvocationId, StreamInfo> function) {
-      final StreamInfo info = map.computeIfAbsent(key, function);
-      LOG.debug("computeIfAbsent({}) returns {}", key, info);
-      return info;
-    }
-
-    StreamInfo get(ClientInvocationId key) {
-      final StreamInfo info = map.get(key);
-      LOG.debug("get({}) returns {}", key, info);
-      return info;
-    }
-
-    StreamInfo remove(ClientInvocationId key) {
-      final StreamInfo info = map.remove(key);
-      LOG.debug("remove({}) returns {}", key, info);
-      return info;
-    }
-  }
-
-  public static class ChannelMap {
-    private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> 
map = new ConcurrentHashMap<>();
-
-    public void add(ChannelId channelId,
-                    ClientInvocationId clientInvocationId) {
-      map.computeIfAbsent(channelId, (e) -> new 
ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
-    }
-
-    public void remove(ChannelId channelId,
-                       ClientInvocationId clientInvocationId) {
-      Optional.ofNullable(map.get(channelId)).ifPresent((ids) -> 
ids.remove(clientInvocationId));
-    }
-
-    public Set<ClientInvocationId> remove(ChannelId channelId) {
-      return Optional.ofNullable(map.remove(channelId))
-          .map(Map::keySet)
-          .orElse(Collections.emptySet());
-    }
-  }
-
   private final RaftServer server;
   private final String name;
 
-  private final StreamMap streams = new StreamMap();
+  private final StreamMap<StreamInfo> streams = new StreamMap<>();
   private final ChannelMap channels;
   private final ExecutorService requestExecutor;
   private final ExecutorService writeExecutor;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
new file mode 100644
index 000000000..073698cb8
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+/**
+ * Map: {@link ClientInvocationId} -> {@link STREAM}.
+ *
+ * @param <STREAM> the stream type.
+ */
+class StreamMap<STREAM> {
+  public static final Logger LOG = LoggerFactory.getLogger(StreamMap.class);
+
+  private final ConcurrentMap<ClientInvocationId, STREAM> map = new 
ConcurrentHashMap<>();
+
+  STREAM computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, 
STREAM> function) {
+    final STREAM info = map.computeIfAbsent(key, function);
+    LOG.debug("computeIfAbsent({}) returns {}", key, info);
+    return info;
+  }
+
+  STREAM get(ClientInvocationId key) {
+    final STREAM info = map.get(key);
+    LOG.debug("get({}) returns {}", key, info);
+    return info;
+  }
+
+  STREAM remove(ClientInvocationId key) {
+    final STREAM info = map.remove(key);
+    LOG.debug("remove({}) returns {}", key, info);
+    return info;
+  }
+}
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index 4e948c6f8..6316ef607 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -20,18 +20,10 @@ package org.apache.ratis.server;
 import org.apache.ratis.protocol.RaftPeer;
 
 import java.io.Closeable;
-import java.net.InetSocketAddress;
 
 /**
  * A server interface handling incoming streams
  * Relays those streams to other servers after persisting
  */
-public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
-  /**
-   * start server
-   */
-  void start();
-
-  /** @return the address where this RPC server is listening to. */
-  InetSocketAddress getInetSocketAddress();
+public interface DataStreamServerRpc extends ServerRpc, RaftPeer.Add, 
Closeable {
 }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index d81f9cc8b..76bd817f5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -26,20 +26,13 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.util.JavaUtils;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 
 /**
  * An server-side interface for supporting different RPC implementations
  * such as Netty, gRPC and Hadoop.
  */
-public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, 
RaftPeer.Add, Closeable {
-  /** Start the RPC service. */
-  void start() throws IOException;
-
-  /** @return the address where this RPC server is listening */
-  InetSocketAddress getInetSocketAddress();
-
+public interface RaftServerRpc extends RaftServerProtocol, ServerRpc, 
RpcType.Get, RaftPeer.Add, Closeable {
   /** @return the address where this RPC server is listening for client 
requests */
   default InetSocketAddress getClientServerAddress() {
     return getInetSocketAddress();
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
 b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
similarity index 79%
copy from 
ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
copy to ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
index 4e948c6f8..6ad5eacf1 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
@@ -17,20 +17,16 @@
  */
 package org.apache.ratis.server;
 
-import org.apache.ratis.protocol.RaftPeer;
-
 import java.io.Closeable;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 /**
- * A server interface handling incoming streams
- * Relays those streams to other servers after persisting
+ * A general server interface.
  */
-public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
-  /**
-   * start server
-   */
-  void start();
+public interface ServerRpc extends Closeable {
+  /** Start the RPC service. */
+  void start() throws IOException;
 
   /** @return the address where this RPC server is listening to. */
   InetSocketAddress getInetSocketAddress();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 70e26af24..2ac01ac1f 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -67,7 +67,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       return raftServer;
     }
 
-    void start() {
+    void start() throws IOException {
       dataStreamServer.getServerRpc().start();
     }
 
@@ -90,7 +90,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     return servers.get(0);
   }
 
-  void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> 
raftServers) {
+  void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> 
raftServers) throws Exception {
     raftGroup = RaftGroup.valueOf(groupId, peers);
     this.peers = peers;
     servers = new ArrayList<>(peers.size());

Reply via email to