This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a90e0e75 RATIS-1948. 
NettyClientStreamRpc.Connection.scheduleReconnect should check isClosed. (#977)
5a90e0e75 is described below

commit 5a90e0e75cbbf910c7abb54475a1fb886cb2c36d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Dec 4 12:51:19 2023 -0800

    RATIS-1948. NettyClientStreamRpc.Connection.scheduleReconnect should check 
isClosed. (#977)
---
 .../apache/ratis/client/RaftClientConfigKeys.java  |  2 +-
 .../apache/ratis/client/api/DataStreamOutput.java  |  7 +-
 .../ratis/client/impl/DataStreamClientImpl.java    |  7 +-
 .../org/apache/ratis/io/StandardWriteOption.java   |  7 +-
 .../ratis/protocol/DataStreamRequestHeader.java    |  4 +-
 .../org/apache/ratis/util/CollectionUtils.java     |  5 ++
 .../main/java/org/apache/ratis/util/JavaUtils.java | 12 +++-
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 24 +++++--
 .../ratis/netty/client/NettyClientStreamRpc.java   | 79 ++++++++++++++--------
 .../ratis/netty/server/DataStreamManagement.java   | 23 ++++++-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 17 ++++-
 .../test/java/org/apache/ratis/RaftTestUtil.java   | 17 +++--
 .../datastream/DataStreamAsyncClusterTests.java    | 48 +++++++++----
 .../ratis/datastream/DataStreamClusterTests.java   |  3 +-
 .../ratis/datastream/DataStreamTestUtils.java      |  3 +-
 15 files changed, 181 insertions(+), 77 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index d52fcab4f..7360a9cad 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -117,7 +117,7 @@ public interface RaftClientConfigKeys {
     String FLUSH_REQUEST_BYTES_MIN_KEY = PREFIX + ".flush.request.bytes.min";
     SizeInBytes FLUSH_REQUEST_BYTES_MIN_DEFAULT = SizeInBytes.ONE_MB;
     static SizeInBytes flushRequestBytesMin(RaftProperties properties) {
-      return getSizeInBytes(properties::getSizeInBytes, 
FLUSH_REQUEST_COUNT_MIN_KEY,
+      return getSizeInBytes(properties::getSizeInBytes, 
FLUSH_REQUEST_BYTES_MIN_KEY,
           FLUSH_REQUEST_BYTES_MIN_DEFAULT, getDefaultLog(), 
requireMinSizeInByte(SizeInBytes.ZERO));
     }
     static void setFlushRequestBytesMin(RaftProperties properties, SizeInBytes 
flushRequestBytesMin) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index cb5045927..f3642d1b4 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -53,11 +53,10 @@ public interface DataStreamOutput extends 
CloseAsync<DataStreamReply> {
 
 
   /**
-   * The same as writeAsync(src, 0, src.length(), sync_default).
-   * where sync_default depends on the underlying implementation.
+   * The same as writeAsync(src, 0, src.length(), options).
    */
-  default CompletableFuture<DataStreamReply> writeAsync(File src) {
-    return writeAsync(src, 0, src.length());
+  default CompletableFuture<DataStreamReply> writeAsync(File src, 
WriteOption... options) {
+    return writeAsync(src, 0, src.length(), options);
   }
 
   /**
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index d184eb2ab..26d01c356 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -108,7 +108,8 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       @Override
       public int write(ByteBuffer src) throws IOException {
         final int remaining = src.remaining();
-        final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src),
+        // flush each call; otherwise the future will not be completed.
+        final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src, 
StandardWriteOption.FLUSH),
             () -> "write(" + remaining + " bytes for " + 
ClientInvocationId.valueOf(header) + ")");
         return Math.toIntExact(reply.getBytesWritten());
       }
@@ -134,7 +135,9 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       this.header = request;
       this.slidingWindow = new 
SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, 
header.getCallId()));
       final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
-      this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(), 
Collections.emptyList());
+      // TODO: RATIS-1938: In order not to auto-flush the header, remove the 
FLUSH below.
+      this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(),
+          Collections.singleton(StandardWriteOption.FLUSH));
     }
     private CompletableFuture<DataStreamReply> send(Type type, Object data, 
long length,
                                                     Iterable<WriteOption> 
options) {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java 
b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
index 27c91a5d8..5be7a4215 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -18,10 +18,13 @@
 package org.apache.ratis.io;
 
 public enum StandardWriteOption implements WriteOption {
-  /** Sync the data to the underlying storage. */
+  /**
+   * Sync the data to the underlying storage.
+   * Note that SYNC does not imply {@link #FLUSH}.
+   */
   SYNC,
   /** Close the data to the underlying storage. */
   CLOSE,
-  /** Flush the data out to the network. */
+  /** Flush the data out from the buffer. */
   FLUSH,
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 7cbf17ea6..9cc841c14 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -20,7 +20,7 @@ package org.apache.ratis.protocol;
 
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.apache.ratis.util.CollectionUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,7 +41,7 @@ public class DataStreamRequestHeader extends 
DataStreamPacketHeader implements D
   public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, 
long streamOffset, long dataLength,
                                  Iterable<WriteOption> options) {
     super(clientId, type, streamId, streamOffset, dataLength);
-    this.options = Collections.unmodifiableList(Lists.newArrayList(options));
+    this.options = 
Collections.unmodifiableList(CollectionUtils.distinct(options));
   }
 
   @Override
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index db0c6fd93..11f484608 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -168,4 +168,9 @@ public interface CollectionUtils {
     right.sort(comparator);
     return left.equals(right);
   }
+
+  /** @return a list the distinct elements. */
+  static <V> List<V> distinct(Iterable<V> elements) {
+    return StreamSupport.stream(elements.spliterator(), 
false).distinct().collect(Collectors.toList());
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index ff6f0f93d..00725903a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.util.function.CheckedFunction;
 import org.apache.ratis.util.function.CheckedRunnable;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
@@ -208,13 +209,20 @@ public interface JavaUtils {
       CheckedSupplier<RETURN, THROWABLE> supplier,
       int numAttempts, TimeDuration sleepTime, Supplier<?> name, Logger log)
       throws THROWABLE, InterruptedException {
-    Objects.requireNonNull(supplier, "supplier == null");
+    return attempt(i -> supplier.get(), numAttempts, sleepTime, name, log);
+  }
+
+  static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+      CheckedFunction<Integer, RETURN, THROWABLE> attemptMethod,
+      int numAttempts, TimeDuration sleepTime, Supplier<?> name, Logger log)
+      throws THROWABLE, InterruptedException {
+    Objects.requireNonNull(attemptMethod, "attemptMethod == null");
     Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + 
numAttempts + " <= 0");
     Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime + " < 0");
 
     for(int i = 1; i <= numAttempts; i++) {
       try {
-        return supplier.get();
+        return attemptMethod.apply(i);
       } catch (Throwable t) {
         if (i == numAttempts) {
           throw t;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index f51451142..bd03fefcc 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -48,18 +49,29 @@ import java.util.function.Function;
 public interface NettyDataStreamUtils {
   Logger LOG = LoggerFactory.getLogger(NettyDataStreamUtils.class);
 
+  static DataStreamPacketHeaderProto.Option getOption(WriteOption option) {
+    if (option == StandardWriteOption.FLUSH) {
+      // FLUSH is a local option which should not be included in the header.
+      return null;
+    } else if (option instanceof StandardWriteOption) {
+      return 
DataStreamPacketHeaderProto.Option.forNumber(((StandardWriteOption) 
option).ordinal());
+    }
+    throw new IllegalArgumentException("Unexpected WriteOption " + option);
+  }
+
   static ByteBuffer 
getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
-    DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
-        .newBuilder()
+    final DataStreamPacketHeaderProto.Builder b = 
DataStreamPacketHeaderProto.newBuilder()
         .setClientId(request.getClientId().toByteString())
         .setStreamId(request.getStreamId())
         .setStreamOffset(request.getStreamOffset())
         .setType(request.getType())
         .setDataLength(request.getDataLength());
-    for (WriteOption option : request.getWriteOptionList()) {
-      b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
-          ((StandardWriteOption) option).ordinal()));
-    }
+
+    request.getWriteOptionList().stream()
+        .map(NettyDataStreamUtils::getOption)
+        .filter(Objects::nonNull)
+        .forEach(b::addOptions);
+
     return DataStreamRequestHeaderProto
         .newBuilder()
         .setPacketHeader(b)
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index d842346e2..17cb84aa7 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -142,7 +143,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     private final Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier;
 
     /** The {@link ChannelFuture} is null when this connection is closed. */
-    private final AtomicReference<Supplier<ChannelFuture>> ref;
+    private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref;
 
     Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
         Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier) {
@@ -175,6 +176,9 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     }
 
     private ChannelFuture connect() {
+      if (isClosed()) {
+        return null;
+      }
       return new Bootstrap()
           .group(getWorkerGroup())
           .channel(NettyUtils.getSocketChannelClass(getWorkerGroup()))
@@ -194,6 +198,9 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     }
 
     void scheduleReconnect(String message, Throwable cause) {
+      if (isClosed()) {
+        return;
+      }
       LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, 
address, RECONNECT);
       if (cause != null) {
         LOG.warn("", cause);
@@ -215,20 +222,22 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
       // AtomicReference.getAndUpdate may call the update function multiple 
times and discard the old objects.
       // The outer supplier creates only an inner supplier, which can be 
discarded without any leakage.
       // The inner supplier will be invoked (i.e. connect) ONLY IF it is 
successfully set to the reference.
-      final MemoizedSupplier<Supplier<ChannelFuture>> supplier = 
MemoizedSupplier.valueOf(
+      final MemoizedSupplier<MemoizedSupplier<ChannelFuture>> supplier = 
MemoizedSupplier.valueOf(
           () -> MemoizedSupplier.valueOf(this::connect));
-      final Supplier<ChannelFuture> previous = ref.getAndUpdate(prev -> prev 
== null? null: supplier.get());
-      if (previous != null) {
+      final MemoizedSupplier<ChannelFuture> previous = ref.getAndUpdate(prev 
-> prev == null? null: supplier.get());
+      if (previous != null && previous.isInitialized()) {
         previous.get().channel().close();
       }
       return getChannelFuture();
     }
 
     void close() {
-      final Supplier<ChannelFuture> previous = ref.getAndSet(null);
-      if (previous != null) {
+      final MemoizedSupplier<ChannelFuture> previous = ref.getAndSet(null);
+      if (previous != null && previous.isInitialized()) {
         // wait channel closed, do shutdown workerGroup
         previous.get().channel().close().addListener(future -> 
workerGroup.shutdownGracefully());
+      } else {
+        workerGroup.shutdownGracefully();
       }
     }
 
@@ -242,29 +251,44 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     }
   }
 
-  class OutstandingRequests {
+  static class OutstandingRequests {
     private int count;
     private long bytes;
 
-    synchronized boolean write(DataStreamRequest request) {
-      count++;
-      bytes += request.getDataLength();
-      final List<WriteOption> options = request.getWriteOptionList();
-      final boolean isClose = options.contains(StandardWriteOption.CLOSE);
-      final boolean isFlush = options.contains(StandardWriteOption.FLUSH);
-      final boolean flush = shouldFlush(isClose || isFlush, 
flushRequestCountMin, flushRequestBytesMin);
-      LOG.debug("Stream{} outstanding: count={}, bytes={}, options={}, flush? 
{}",
-          request.getStreamId(), count, bytes, options, flush);
-      return flush;
+    private boolean shouldFlush(List<WriteOption> options, int countMin, 
SizeInBytes bytesMin) {
+      if (options.contains(StandardWriteOption.CLOSE)) {
+        // flush in order to send the CLOSE option.
+        return true;
+      } else if (bytes == 0 && count == 0) {
+        // nothing to flush (when bytes == 0 && count > 0, client may have 
written empty packets for including options)
+        return false;
+      } else {
+        return count >= countMin
+            || bytes >= bytesMin.getSize()
+            || options.contains(StandardWriteOption.FLUSH);
+      }
     }
 
-    synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes 
bytesMin) {
-      if (force || count >= countMin || bytes >= bytesMin.getSize()) {
+    synchronized boolean shouldFlush(int countMin, SizeInBytes bytesMin, 
DataStreamRequest request) {
+      final List<WriteOption> options;
+      if (request == null) {
+        options = Collections.emptyList();
+      } else {
+        options = request.getWriteOptionList();
+        count++;
+        final long length = request.getDataLength();
+        Preconditions.assertTrue(length >= 0, () -> "length = " + length + " < 
0, request: " + request);
+        bytes += length;
+      }
+
+      final boolean flush = shouldFlush(options, countMin, bytesMin);
+      LOG.debug("flush? {}, (count, bytes)=({}, {}), min=({}, {}), request={}, 
options={}",
+          flush, count, bytes, countMin, bytesMin, request, options);
+      if (flush) {
         count = 0;
         bytes = 0;
-        return true;
       }
-      return false;
+      return flush;
     }
   }
 
@@ -280,7 +304,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   private final OutstandingRequests outstandingRequests = new 
OutstandingRequests();
 
   public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties 
properties) {
-    this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
+    this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + 
server.getId();
     this.requestTimeout = 
RaftClientConfigKeys.DataStream.requestTimeout(properties);
     this.closeTimeout = requestTimeout.multiply(2);
 
@@ -329,9 +353,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
       @Override
       public void channelInactive(ChannelHandlerContext ctx) {
-        if (!connection.isClosed()) {
-          connection.scheduleReconnect("channel is inactive", null);
-        }
+        connection.scheduleReconnect("channel is inactive", null);
       }
     };
   }
@@ -417,8 +439,8 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         return f;
       }
       replyEntry = replyMap.submitRequest(requestEntry, isClose, f);
-      final Function<DataStreamRequest, ChannelFuture> writeMethod = 
outstandingRequests.write(request)?
-          channel::writeAndFlush: channel::write;
+      final Function<DataStreamRequest, ChannelFuture> writeMethod = 
outstandingRequests.shouldFlush(
+          flushRequestCountMin, flushRequestBytesMin, request)? 
channel::writeAndFlush: channel::write;
       channelFuture = writeMethod.apply(request);
     }
     channelFuture.addListener(future -> {
@@ -448,8 +470,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
   @Override
   public void close() {
-    final boolean flush = outstandingRequests.shouldFlush(true, 0, 
SizeInBytes.ZERO);
-    LOG.debug("flush? {}", flush);
+    final boolean flush = outstandingRequests.shouldFlush(0, SizeInBytes.ZERO, 
null);
     if (flush) {
       Optional.ofNullable(connection.getChannelUninterruptibly())
           .map(c -> c.writeAndFlush(EMPTY_BYTE_BUFFER))
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 695cc9645..276a365ce 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -77,9 +77,11 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DataStreamManagement {
   public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamManagement.class);
@@ -119,10 +121,18 @@ public class DataStreamManagement {
       this.out = out;
     }
 
+    static Iterable<WriteOption> addFlush(List<WriteOption> original) {
+      if (original.contains(StandardWriteOption.FLUSH)) {
+        return original;
+      }
+      return Stream.concat(Stream.of(StandardWriteOption.FLUSH), 
original.stream())
+          .collect(Collectors.toList());
+    }
+
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, 
Executor executor) {
       final Timekeeper.Context context = metrics.start();
       return composeAsync(sendFuture, executor,
-          n -> out.writeAsync(request.slice().nioBuffer(), 
request.getWriteOptionList())
+          n -> out.writeAsync(request.slice().nioBuffer(), 
addFlush(request.getWriteOptionList()))
               .whenComplete((l, e) -> metrics.stop(context, e == null)));
     }
   }
@@ -255,8 +265,8 @@ public class DataStreamManagement {
 
   private final StreamMap streams = new StreamMap();
   private final ChannelMap channels;
-  private final Executor requestExecutor;
-  private final Executor writeExecutor;
+  private final ExecutorService requestExecutor;
+  private final ExecutorService writeExecutor;
 
   private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;
 
@@ -277,6 +287,13 @@ public class DataStreamManagement {
     this.nettyServerStreamRpcMetrics = metrics;
   }
 
+  void shutdown() {
+    ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, requestExecutor,
+        timeout -> LOG.warn("{}: requestExecutor shutdown timeout in {}", 
this, timeout));
+    ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, writeExecutor,
+        timeout -> LOG.warn("{}: writeExecutor shutdown timeout in {}", this, 
timeout));
+  }
+
   private CompletableFuture<DataStream> stream(RaftClientRequest request, 
StateMachine stateMachine) {
     final RequestMetrics metrics = 
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
     final Timekeeper.Context context = metrics.start();
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index b0acf2580..c5f24b058 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -301,20 +301,31 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   @Override
   public void close() {
+    try {
+      proxies.close();
+    } catch (Exception e) {
+      LOG.error(this + ": Failed to close proxies.", e);
+    }
+
+    try {
+      requests.shutdown();
+    } catch (Exception e) {
+      LOG.error(this + ": Failed to shutdown request service.", e);
+    }
+
     try {
       channelFuture.channel().close().sync();
       bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
       workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
       ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, bossGroup,
-          timeout -> LOG.warn("{}: bossGroup shutdown timeout in " + timeout, 
this));
+          timeout -> LOG.warn("{}: bossGroup shutdown timeout in {}", this, 
timeout));
       ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, workerGroup,
-          timeout -> LOG.warn("{}: workerGroup shutdown timeout in " + 
timeout, this));
+          timeout -> LOG.warn("{}: workerGroup shutdown timeout in {}", this, 
timeout));
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOG.error(this + ": Interrupted close()", e);
     }
 
-    proxies.close();
   }
 
   @Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f54e7db53..1d45c6821 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -108,13 +108,18 @@ public interface RaftTestUtil {
       exception.set(ise);
     };
 
-    final RaftServer.Division leader = JavaUtils.attemptRepeatedly(() -> {
-      final RaftServer.Division l = cluster.getLeader(groupId, 
handleNoLeaders, handleMultipleLeaders);
-      if (l != null && !l.getInfo().isLeaderReady()) {
-        throw new IllegalStateException("Leader: "+ l.getMemberId() +  " not 
ready");
+    final RaftServer.Division leader = JavaUtils.attempt(i -> {
+      try {
+        final RaftServer.Division l = cluster.getLeader(groupId, 
handleNoLeaders, handleMultipleLeaders);
+        if (l != null && !l.getInfo().isLeaderReady()) {
+          throw new IllegalStateException("Leader: " + l.getMemberId() + " not 
ready");
+        }
+        return l;
+      } catch (Exception e) {
+        LOG.warn("Attempt #{} failed: " + e, i);
+        throw e;
       }
-      return l;
-    }, numAttempts, sleepTime, name, LOG);
+    }, numAttempts, sleepTime, () -> name, null);
 
     LOG.info(cluster.printServers(groupId));
     if (expectLeader) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 86b03a2da..8c315070e 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.datastream;
 
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.netty.client.NettyClientStreamRpc;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
@@ -31,9 +31,12 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedBiFunction;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -53,6 +56,18 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER 
extends MiniRaftCluste
     return 300;
   }
 
+  @Test
+  public void testSingleStreamsMultipleServers() throws Exception {
+    Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.TRACE);
+    try {
+      runWithNewCluster(3,
+          cluster -> runTestDataStream(cluster, false,
+              (c, stepDownLeader) -> runTestDataStream(c, 1, 1, 1_000, 3, 
stepDownLeader)));
+    } finally {
+      Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.INFO);
+    }
+  }
+
   @Test
   public void testMultipleStreamsSingleServer() throws Exception {
     runWithNewCluster(1, this::runTestDataStream);
@@ -79,34 +94,37 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER 
extends MiniRaftCluste
   }
 
   void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
-    runTestDataStream(cluster, true);
+    runMultipleStreams(cluster, true);
   }
 
   void runTestDataStream(CLUSTER cluster) throws Exception {
-    runTestDataStream(cluster, false);
+    runTestDataStream(cluster, false, this::runMultipleStreams);
   }
 
-  void runTestDataStream(CLUSTER cluster, boolean stepDownLeader) throws 
Exception {
-    RaftTestUtil.waitForLeader(cluster);
-
+  long runMultipleStreams(CLUSTER cluster, boolean stepDownLeader) {
     final List<CompletableFuture<Long>> futures = new ArrayList<>();
     futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 
5, 10, 100_000, 10, stepDownLeader), executor));
     futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 
2, 20, 1_000, 5_000, stepDownLeader), executor));
-    final long maxIndex = futures.stream()
+    return futures.stream()
         .map(CompletableFuture::join)
         .max(Long::compareTo)
         .orElseThrow(IllegalStateException::new);
+  }
+
+  void runTestDataStream(CLUSTER cluster, boolean stepDownLeader, 
CheckedBiFunction<CLUSTER, Boolean, Long, Exception> runMethod) throws 
Exception {
+    RaftTestUtil.waitForLeader(cluster);
+
+    final long maxIndex = runMethod.apply(cluster, stepDownLeader);
 
     if (stepDownLeader) {
       final RaftPeerId oldLeader = cluster.getLeader().getId();
-      final CompletableFuture<RaftPeerId> changeLeader = 
futures.get(0).thenApplyAsync(dummy -> {
-        try {
-          return RaftTestUtil.changeLeader(cluster, oldLeader);
-        } catch (Exception e) {
-          throw new CompletionException("Failed to change leader from " + 
oldLeader, e);
-        }
-      });
-      LOG.info("Changed leader from {} to {}", oldLeader, changeLeader.join());
+      final RaftPeerId changed;
+      try {
+        changed = RaftTestUtil.changeLeader(cluster, oldLeader);
+      } catch (Exception e) {
+        throw new CompletionException("Failed to change leader from " + 
oldLeader, e);
+      }
+      LOG.info("Changed leader from {} to {}", oldLeader, changed);
     }
 
     // wait for all servers to catch up
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 959cb3fc0..352d98e65 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -170,7 +171,7 @@ public abstract class DataStreamClusterTests<CLUSTER 
extends MiniRaftCluster> ex
     return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
       @Override
       public void accept(DataStreamOutputImpl out) {
-        final DataStreamReply dataStreamReply = out.writeAsync(f).join();
+        final DataStreamReply dataStreamReply = out.writeAsync(f, 
StandardWriteOption.FLUSH).join();
         DataStreamTestUtils.assertSuccessReply(Type.STREAM_DATA, size, 
dataStreamReply);
       }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 7666bacd9..738cb0359 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -282,7 +282,8 @@ public interface DataStreamTestUtils {
       sizes.add(size);
 
       final ByteBuffer bf = initBuffer(dataSize, size);
-      futures.add(i == bufferNum - 1 ? out.writeAsync(bf, 
StandardWriteOption.SYNC) : out.writeAsync(bf));
+      futures.add(i == bufferNum - 1 ? out.writeAsync(bf, 
StandardWriteOption.FLUSH, StandardWriteOption.SYNC)
+          : out.writeAsync(bf));
       dataSize += size;
     }
 

Reply via email to