This is an automated email from the ASF dual-hosted git repository.
guohao1225 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e2ea51b56 RATIS-1923. Netty: atomic operations require
side-effect-free functions. (#956)
e2ea51b56 is described below
commit e2ea51b56464c072eef309b10320302c9ef292e7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Nov 1 18:53:35 2023 -0700
RATIS-1923. Netty: atomic operations require side-effect-free functions.
(#956)
* RATIS-1923. Netty: atomic operations require side-effect-free functions.
---
.../ratis/netty/client/NettyClientStreamRpc.java | 30 ++++++++++++++--------
.../ratis/netty/server/DataStreamManagement.java | 25 ++++++++++--------
2 files changed, 34 insertions(+), 21 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index e4c154fd2..e6ce29a46 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -120,18 +120,23 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private final Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier;
/** The {@link ChannelFuture} is null when this connection is closed. */
- private final AtomicReference<ChannelFuture> ref;
+ private final AtomicReference<Supplier<ChannelFuture>> ref;
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
- this.ref = new AtomicReference<>(connect());
+ this.ref = new
AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
+ }
+
+ ChannelFuture getChannelFuture() {
+ final Supplier<ChannelFuture> referenced = ref.get();
+ return referenced != null? referenced.get(): null;
}
Channel getChannelUninterruptibly() {
- final ChannelFuture future = ref.get();
+ final ChannelFuture future = getChannelFuture();
if (future == null) {
return null; //closed
}
@@ -176,7 +181,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private synchronized ChannelFuture reconnect() {
// concurrent reconnect double check
- ChannelFuture channelFuture = ref.get();
+ final ChannelFuture channelFuture = getChannelFuture();
if (channelFuture != null) {
Channel channel = channelFuture.syncUninterruptibly().channel();
if (channel.isActive()) {
@@ -184,19 +189,24 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
}
- final MemoizedSupplier<ChannelFuture> supplier =
MemoizedSupplier.valueOf(this::connect);
- final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null?
null: supplier.get());
+ // Two levels of MemoizedSupplier as a side-effect-free function:
+ // AtomicReference.getAndUpdate may call the update function multiple
times and discard the old objects.
+ // The outer supplier creates only an inner supplier, which can be
discarded without any leakage.
+ // The inner supplier will be invoked (i.e. connect) ONLY IF it is
successfully set to the reference.
+ final MemoizedSupplier<Supplier<ChannelFuture>> supplier =
MemoizedSupplier.valueOf(
+ () -> MemoizedSupplier.valueOf(this::connect));
+ final Supplier<ChannelFuture> previous = ref.getAndUpdate(prev -> prev
== null? null: supplier.get());
if (previous != null) {
- previous.channel().close();
+ previous.get().channel().close();
}
- return supplier.isInitialized() ? supplier.get() : null;
+ return getChannelFuture();
}
void close() {
- final ChannelFuture previous = ref.getAndSet(null);
+ final Supplier<ChannelFuture> previous = ref.getAndSet(null);
if (previous != null) {
// wait channel closed, do shutdown workerGroup
- previous.channel().close().addListener((future) ->
workerGroup.shutdownGracefully());
+ previous.get().channel().close().addListener(future ->
workerGroup.shutdownGracefully());
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index a4cc537dd..958a1ad45 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -46,6 +46,7 @@ import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServer.Division;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
@@ -275,22 +276,24 @@ public class DataStreamManagement {
this.nettyServerStreamRpcMetrics = metrics;
}
+ private CompletableFuture<DataStream> stream(RaftClientRequest request,
StateMachine stateMachine) {
+ final RequestMetrics metrics =
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
+ final Timekeeper.Context context = metrics.start();
+ return stateMachine.data().stream(request)
+ .whenComplete((r, e) -> metrics.stop(context, e == null));
+ }
+
private CompletableFuture<DataStream>
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
final Division division = server.getDivision(request.getRaftGroupId());
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(request);
- final MemoizedSupplier<CompletableFuture<DataStream>> supplier =
JavaUtils.memoize(
- () -> {
- final RequestMetrics metrics =
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
- final Timekeeper.Context context = metrics.start();
- return division.getStateMachine().data().stream(request)
- .whenComplete((r, e) -> metrics.stop(context, e == null));
- });
- final CompletableFuture<DataStream> f = division.getDataStreamMap()
- .computeIfAbsent(invocationId, key -> supplier.get());
- if (!supplier.isInitialized()) {
+ final CompletableFuture<DataStream> created = new CompletableFuture<>();
+ final CompletableFuture<DataStream> returned = division.getDataStreamMap()
+ .computeIfAbsent(invocationId, key -> created);
+ if (returned != created) {
throw new AlreadyExistsException("A DataStream already exists for " +
invocationId);
}
- return f;
+ stream(request,
division.getStateMachine()).whenComplete(JavaUtils.asBiConsumer(created));
+ return created;
}
private StreamInfo newStreamInfo(ByteBuf buf,