This is an automated email from the ASF dual-hosted git repository.

guohao1225 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e2ea51b56 RATIS-1923. Netty: atomic operations require 
side-effect-free functions. (#956)
e2ea51b56 is described below

commit e2ea51b56464c072eef309b10320302c9ef292e7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Nov 1 18:53:35 2023 -0700

    RATIS-1923. Netty: atomic operations require side-effect-free functions. 
(#956)
    
    * RATIS-1923. Netty: atomic operations require side-effect-free functions.
---
 .../ratis/netty/client/NettyClientStreamRpc.java   | 30 ++++++++++++++--------
 .../ratis/netty/server/DataStreamManagement.java   | 25 ++++++++++--------
 2 files changed, 34 insertions(+), 21 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index e4c154fd2..e6ce29a46 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -120,18 +120,23 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     private final Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier;
 
     /** The {@link ChannelFuture} is null when this connection is closed. */
-    private final AtomicReference<ChannelFuture> ref;
+    private final AtomicReference<Supplier<ChannelFuture>> ref;
 
     Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
         Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier) {
       this.address = address;
       this.workerGroup = workerGroup;
       this.channelInitializerSupplier = channelInitializerSupplier;
-      this.ref = new AtomicReference<>(connect());
+      this.ref = new 
AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
+    }
+
+    ChannelFuture getChannelFuture() {
+      final Supplier<ChannelFuture> referenced = ref.get();
+      return referenced != null? referenced.get(): null;
     }
 
     Channel getChannelUninterruptibly() {
-      final ChannelFuture future = ref.get();
+      final ChannelFuture future = getChannelFuture();
       if (future == null) {
         return null; //closed
       }
@@ -176,7 +181,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
     private synchronized ChannelFuture reconnect() {
       // concurrent reconnect double check
-      ChannelFuture channelFuture = ref.get();
+      final ChannelFuture channelFuture = getChannelFuture();
       if (channelFuture != null) {
         Channel channel = channelFuture.syncUninterruptibly().channel();
         if (channel.isActive()) {
@@ -184,19 +189,24 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         }
       }
 
-      final MemoizedSupplier<ChannelFuture> supplier = 
MemoizedSupplier.valueOf(this::connect);
-      final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? 
null: supplier.get());
+      // Two levels of MemoizedSupplier as a side-effect-free function:
+      // AtomicReference.getAndUpdate may call the update function multiple 
times and discard the old objects.
+      // The outer supplier creates only an inner supplier, which can be 
discarded without any leakage.
+      // The inner supplier will be invoked (i.e. connect) ONLY IF it is 
successfully set to the reference.
+      final MemoizedSupplier<Supplier<ChannelFuture>> supplier = 
MemoizedSupplier.valueOf(
+          () -> MemoizedSupplier.valueOf(this::connect));
+      final Supplier<ChannelFuture> previous = ref.getAndUpdate(prev -> prev 
== null? null: supplier.get());
       if (previous != null) {
-        previous.channel().close();
+        previous.get().channel().close();
       }
-      return supplier.isInitialized() ? supplier.get() : null;
+      return getChannelFuture();
     }
 
     void close() {
-      final ChannelFuture previous = ref.getAndSet(null);
+      final Supplier<ChannelFuture> previous = ref.getAndSet(null);
       if (previous != null) {
         // wait channel closed, do shutdown workerGroup
-        previous.channel().close().addListener((future) -> 
workerGroup.shutdownGracefully());
+        previous.get().channel().close().addListener(future -> 
workerGroup.shutdownGracefully());
       }
     }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index a4cc537dd..958a1ad45 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -46,6 +46,7 @@ import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServer.Division;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
@@ -275,22 +276,24 @@ public class DataStreamManagement {
     this.nettyServerStreamRpcMetrics = metrics;
   }
 
+  private CompletableFuture<DataStream> stream(RaftClientRequest request, 
StateMachine stateMachine) {
+    final RequestMetrics metrics = 
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
+    final Timekeeper.Context context = metrics.start();
+    return stateMachine.data().stream(request)
+        .whenComplete((r, e) -> metrics.stop(context, e == null));
+  }
+
   private CompletableFuture<DataStream> 
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
     final Division division = server.getDivision(request.getRaftGroupId());
     final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(request);
-    final MemoizedSupplier<CompletableFuture<DataStream>> supplier = 
JavaUtils.memoize(
-        () -> {
-          final RequestMetrics metrics = 
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
-          final Timekeeper.Context context = metrics.start();
-          return division.getStateMachine().data().stream(request)
-              .whenComplete((r, e) -> metrics.stop(context, e == null));
-        });
-    final CompletableFuture<DataStream> f = division.getDataStreamMap()
-        .computeIfAbsent(invocationId, key -> supplier.get());
-    if (!supplier.isInitialized()) {
+    final CompletableFuture<DataStream> created = new CompletableFuture<>();
+    final CompletableFuture<DataStream> returned = division.getDataStreamMap()
+        .computeIfAbsent(invocationId, key -> created);
+    if (returned != created) {
       throw new AlreadyExistsException("A DataStream already exists for " + 
invocationId);
     }
-    return f;
+    stream(request, 
division.getStateMachine()).whenComplete(JavaUtils.asBiConsumer(created));
+    return created;
   }
 
   private StreamInfo newStreamInfo(ByteBuf buf,

Reply via email to