This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5a90e0e75 RATIS-1948.
NettyClientStreamRpc.Connection.scheduleReconnect should check isClosed. (#977)
5a90e0e75 is described below
commit 5a90e0e75cbbf910c7abb54475a1fb886cb2c36d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Dec 4 12:51:19 2023 -0800
RATIS-1948. NettyClientStreamRpc.Connection.scheduleReconnect should check
isClosed. (#977)
---
.../apache/ratis/client/RaftClientConfigKeys.java | 2 +-
.../apache/ratis/client/api/DataStreamOutput.java | 7 +-
.../ratis/client/impl/DataStreamClientImpl.java | 7 +-
.../org/apache/ratis/io/StandardWriteOption.java | 7 +-
.../ratis/protocol/DataStreamRequestHeader.java | 4 +-
.../org/apache/ratis/util/CollectionUtils.java | 5 ++
.../main/java/org/apache/ratis/util/JavaUtils.java | 12 +++-
.../apache/ratis/netty/NettyDataStreamUtils.java | 24 +++++--
.../ratis/netty/client/NettyClientStreamRpc.java | 79 ++++++++++++++--------
.../ratis/netty/server/DataStreamManagement.java | 23 ++++++-
.../ratis/netty/server/NettyServerStreamRpc.java | 17 ++++-
.../test/java/org/apache/ratis/RaftTestUtil.java | 17 +++--
.../datastream/DataStreamAsyncClusterTests.java | 48 +++++++++----
.../ratis/datastream/DataStreamClusterTests.java | 3 +-
.../ratis/datastream/DataStreamTestUtils.java | 3 +-
15 files changed, 181 insertions(+), 77 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index d52fcab4f..7360a9cad 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -117,7 +117,7 @@ public interface RaftClientConfigKeys {
String FLUSH_REQUEST_BYTES_MIN_KEY = PREFIX + ".flush.request.bytes.min";
SizeInBytes FLUSH_REQUEST_BYTES_MIN_DEFAULT = SizeInBytes.ONE_MB;
static SizeInBytes flushRequestBytesMin(RaftProperties properties) {
- return getSizeInBytes(properties::getSizeInBytes,
FLUSH_REQUEST_COUNT_MIN_KEY,
+ return getSizeInBytes(properties::getSizeInBytes,
FLUSH_REQUEST_BYTES_MIN_KEY,
FLUSH_REQUEST_BYTES_MIN_DEFAULT, getDefaultLog(),
requireMinSizeInByte(SizeInBytes.ZERO));
}
static void setFlushRequestBytesMin(RaftProperties properties, SizeInBytes
flushRequestBytesMin) {
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index cb5045927..f3642d1b4 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -53,11 +53,10 @@ public interface DataStreamOutput extends
CloseAsync<DataStreamReply> {
/**
- * The same as writeAsync(src, 0, src.length(), sync_default).
- * where sync_default depends on the underlying implementation.
+ * The same as writeAsync(src, 0, src.length(), options).
*/
- default CompletableFuture<DataStreamReply> writeAsync(File src) {
- return writeAsync(src, 0, src.length());
+ default CompletableFuture<DataStreamReply> writeAsync(File src,
WriteOption... options) {
+ return writeAsync(src, 0, src.length(), options);
}
/**
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index d184eb2ab..26d01c356 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -108,7 +108,8 @@ public class DataStreamClientImpl implements
DataStreamClient {
@Override
public int write(ByteBuffer src) throws IOException {
final int remaining = src.remaining();
- final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src),
+ // flush each call; otherwise the future will not be completed.
+ final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src,
StandardWriteOption.FLUSH),
() -> "write(" + remaining + " bytes for " +
ClientInvocationId.valueOf(header) + ")");
return Math.toIntExact(reply.getBytesWritten());
}
@@ -134,7 +135,9 @@ public class DataStreamClientImpl implements
DataStreamClient {
this.header = request;
this.slidingWindow = new
SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId,
header.getCallId()));
final ByteBuffer buffer =
ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
- this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(),
Collections.emptyList());
+ // TODO: RATIS-1938: In order not to auto-flush the header, remove the
FLUSH below.
+ this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(),
+ Collections.singleton(StandardWriteOption.FLUSH));
}
private CompletableFuture<DataStreamReply> send(Type type, Object data,
long length,
Iterable<WriteOption>
options) {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
index 27c91a5d8..5be7a4215 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -18,10 +18,13 @@
package org.apache.ratis.io;
public enum StandardWriteOption implements WriteOption {
- /** Sync the data to the underlying storage. */
+ /**
+ * Sync the data to the underlying storage.
+ * Note that SYNC does not imply {@link #FLUSH}.
+ */
SYNC,
/** Close the data to the underlying storage. */
CLOSE,
- /** Flush the data out to the network. */
+ /** Flush the data out from the buffer. */
FLUSH,
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 7cbf17ea6..9cc841c14 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -20,7 +20,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.apache.ratis.util.CollectionUtils;
import java.util.Arrays;
import java.util.Collections;
@@ -41,7 +41,7 @@ public class DataStreamRequestHeader extends
DataStreamPacketHeader implements D
public DataStreamRequestHeader(ClientId clientId, Type type, long streamId,
long streamOffset, long dataLength,
Iterable<WriteOption> options) {
super(clientId, type, streamId, streamOffset, dataLength);
- this.options = Collections.unmodifiableList(Lists.newArrayList(options));
+ this.options =
Collections.unmodifiableList(CollectionUtils.distinct(options));
}
@Override
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index db0c6fd93..11f484608 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -168,4 +168,9 @@ public interface CollectionUtils {
right.sort(comparator);
return left.equals(right);
}
+
+ /** @return a list the distinct elements. */
+ static <V> List<V> distinct(Iterable<V> elements) {
+ return StreamSupport.stream(elements.spliterator(),
false).distinct().collect(Collectors.toList());
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index ff6f0f93d..00725903a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.util.function.CheckedFunction;
import org.apache.ratis.util.function.CheckedRunnable;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
@@ -208,13 +209,20 @@ public interface JavaUtils {
CheckedSupplier<RETURN, THROWABLE> supplier,
int numAttempts, TimeDuration sleepTime, Supplier<?> name, Logger log)
throws THROWABLE, InterruptedException {
- Objects.requireNonNull(supplier, "supplier == null");
+ return attempt(i -> supplier.get(), numAttempts, sleepTime, name, log);
+ }
+
+ static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+ CheckedFunction<Integer, RETURN, THROWABLE> attemptMethod,
+ int numAttempts, TimeDuration sleepTime, Supplier<?> name, Logger log)
+ throws THROWABLE, InterruptedException {
+ Objects.requireNonNull(attemptMethod, "attemptMethod == null");
Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " +
numAttempts + " <= 0");
Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " +
sleepTime + " < 0");
for(int i = 1; i <= numAttempts; i++) {
try {
- return supplier.get();
+ return attemptMethod.apply(i);
} catch (Throwable t) {
if (i == numAttempts) {
throw t;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index f51451142..bd03fefcc 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -48,18 +49,29 @@ import java.util.function.Function;
public interface NettyDataStreamUtils {
Logger LOG = LoggerFactory.getLogger(NettyDataStreamUtils.class);
+ static DataStreamPacketHeaderProto.Option getOption(WriteOption option) {
+ if (option == StandardWriteOption.FLUSH) {
+ // FLUSH is a local option which should not be included in the header.
+ return null;
+ } else if (option instanceof StandardWriteOption) {
+ return
DataStreamPacketHeaderProto.Option.forNumber(((StandardWriteOption)
option).ordinal());
+ }
+ throw new IllegalArgumentException("Unexpected WriteOption " + option);
+ }
+
static ByteBuffer
getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
- DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
- .newBuilder()
+ final DataStreamPacketHeaderProto.Builder b =
DataStreamPacketHeaderProto.newBuilder()
.setClientId(request.getClientId().toByteString())
.setStreamId(request.getStreamId())
.setStreamOffset(request.getStreamOffset())
.setType(request.getType())
.setDataLength(request.getDataLength());
- for (WriteOption option : request.getWriteOptionList()) {
- b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
- ((StandardWriteOption) option).ordinal()));
- }
+
+ request.getWriteOptionList().stream()
+ .map(NettyDataStreamUtils::getOption)
+ .filter(Objects::nonNull)
+ .forEach(b::addOptions);
+
return DataStreamRequestHeaderProto
.newBuilder()
.setPacketHeader(b)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index d842346e2..17cb84aa7 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -142,7 +143,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private final Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier;
/** The {@link ChannelFuture} is null when this connection is closed. */
- private final AtomicReference<Supplier<ChannelFuture>> ref;
+ private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref;
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier) {
@@ -175,6 +176,9 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
private ChannelFuture connect() {
+ if (isClosed()) {
+ return null;
+ }
return new Bootstrap()
.group(getWorkerGroup())
.channel(NettyUtils.getSocketChannelClass(getWorkerGroup()))
@@ -194,6 +198,9 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
void scheduleReconnect(String message, Throwable cause) {
+ if (isClosed()) {
+ return;
+ }
LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message,
address, RECONNECT);
if (cause != null) {
LOG.warn("", cause);
@@ -215,20 +222,22 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
// AtomicReference.getAndUpdate may call the update function multiple
times and discard the old objects.
// The outer supplier creates only an inner supplier, which can be
discarded without any leakage.
// The inner supplier will be invoked (i.e. connect) ONLY IF it is
successfully set to the reference.
- final MemoizedSupplier<Supplier<ChannelFuture>> supplier =
MemoizedSupplier.valueOf(
+ final MemoizedSupplier<MemoizedSupplier<ChannelFuture>> supplier =
MemoizedSupplier.valueOf(
() -> MemoizedSupplier.valueOf(this::connect));
- final Supplier<ChannelFuture> previous = ref.getAndUpdate(prev -> prev
== null? null: supplier.get());
- if (previous != null) {
+ final MemoizedSupplier<ChannelFuture> previous = ref.getAndUpdate(prev
-> prev == null? null: supplier.get());
+ if (previous != null && previous.isInitialized()) {
previous.get().channel().close();
}
return getChannelFuture();
}
void close() {
- final Supplier<ChannelFuture> previous = ref.getAndSet(null);
- if (previous != null) {
+ final MemoizedSupplier<ChannelFuture> previous = ref.getAndSet(null);
+ if (previous != null && previous.isInitialized()) {
// wait channel closed, do shutdown workerGroup
previous.get().channel().close().addListener(future ->
workerGroup.shutdownGracefully());
+ } else {
+ workerGroup.shutdownGracefully();
}
}
@@ -242,29 +251,44 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
}
- class OutstandingRequests {
+ static class OutstandingRequests {
private int count;
private long bytes;
- synchronized boolean write(DataStreamRequest request) {
- count++;
- bytes += request.getDataLength();
- final List<WriteOption> options = request.getWriteOptionList();
- final boolean isClose = options.contains(StandardWriteOption.CLOSE);
- final boolean isFlush = options.contains(StandardWriteOption.FLUSH);
- final boolean flush = shouldFlush(isClose || isFlush,
flushRequestCountMin, flushRequestBytesMin);
- LOG.debug("Stream{} outstanding: count={}, bytes={}, options={}, flush?
{}",
- request.getStreamId(), count, bytes, options, flush);
- return flush;
+ private boolean shouldFlush(List<WriteOption> options, int countMin,
SizeInBytes bytesMin) {
+ if (options.contains(StandardWriteOption.CLOSE)) {
+ // flush in order to send the CLOSE option.
+ return true;
+ } else if (bytes == 0 && count == 0) {
+ // nothing to flush (when bytes == 0 && count > 0, client may have
written empty packets for including options)
+ return false;
+ } else {
+ return count >= countMin
+ || bytes >= bytesMin.getSize()
+ || options.contains(StandardWriteOption.FLUSH);
+ }
}
- synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes
bytesMin) {
- if (force || count >= countMin || bytes >= bytesMin.getSize()) {
+ synchronized boolean shouldFlush(int countMin, SizeInBytes bytesMin,
DataStreamRequest request) {
+ final List<WriteOption> options;
+ if (request == null) {
+ options = Collections.emptyList();
+ } else {
+ options = request.getWriteOptionList();
+ count++;
+ final long length = request.getDataLength();
+ Preconditions.assertTrue(length >= 0, () -> "length = " + length + " <
0, request: " + request);
+ bytes += length;
+ }
+
+ final boolean flush = shouldFlush(options, countMin, bytesMin);
+ LOG.debug("flush? {}, (count, bytes)=({}, {}), min=({}, {}), request={},
options={}",
+ flush, count, bytes, countMin, bytesMin, request, options);
+ if (flush) {
count = 0;
bytes = 0;
- return true;
}
- return false;
+ return flush;
}
}
@@ -280,7 +304,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private final OutstandingRequests outstandingRequests = new
OutstandingRequests();
public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties
properties) {
- this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
+ this.name = JavaUtils.getClassSimpleName(getClass()) + "->" +
server.getId();
this.requestTimeout =
RaftClientConfigKeys.DataStream.requestTimeout(properties);
this.closeTimeout = requestTimeout.multiply(2);
@@ -329,9 +353,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
- if (!connection.isClosed()) {
- connection.scheduleReconnect("channel is inactive", null);
- }
+ connection.scheduleReconnect("channel is inactive", null);
}
};
}
@@ -417,8 +439,8 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
return f;
}
replyEntry = replyMap.submitRequest(requestEntry, isClose, f);
- final Function<DataStreamRequest, ChannelFuture> writeMethod =
outstandingRequests.write(request)?
- channel::writeAndFlush: channel::write;
+ final Function<DataStreamRequest, ChannelFuture> writeMethod =
outstandingRequests.shouldFlush(
+ flushRequestCountMin, flushRequestBytesMin, request)?
channel::writeAndFlush: channel::write;
channelFuture = writeMethod.apply(request);
}
channelFuture.addListener(future -> {
@@ -448,8 +470,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
@Override
public void close() {
- final boolean flush = outstandingRequests.shouldFlush(true, 0,
SizeInBytes.ZERO);
- LOG.debug("flush? {}", flush);
+ final boolean flush = outstandingRequests.shouldFlush(0, SizeInBytes.ZERO,
null);
if (flush) {
Optional.ofNullable(connection.getChannelUninterruptibly())
.map(c -> c.writeAndFlush(EMPTY_BYTE_BUFFER))
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 695cc9645..276a365ce 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -77,9 +77,11 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DataStreamManagement {
public static final Logger LOG =
LoggerFactory.getLogger(DataStreamManagement.class);
@@ -119,10 +121,18 @@ public class DataStreamManagement {
this.out = out;
}
+ static Iterable<WriteOption> addFlush(List<WriteOption> original) {
+ if (original.contains(StandardWriteOption.FLUSH)) {
+ return original;
+ }
+ return Stream.concat(Stream.of(StandardWriteOption.FLUSH),
original.stream())
+ .collect(Collectors.toList());
+ }
+
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request,
Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(sendFuture, executor,
- n -> out.writeAsync(request.slice().nioBuffer(),
request.getWriteOptionList())
+ n -> out.writeAsync(request.slice().nioBuffer(),
addFlush(request.getWriteOptionList()))
.whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
@@ -255,8 +265,8 @@ public class DataStreamManagement {
private final StreamMap streams = new StreamMap();
private final ChannelMap channels;
- private final Executor requestExecutor;
- private final Executor writeExecutor;
+ private final ExecutorService requestExecutor;
+ private final ExecutorService writeExecutor;
private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;
@@ -277,6 +287,13 @@ public class DataStreamManagement {
this.nettyServerStreamRpcMetrics = metrics;
}
+ void shutdown() {
+ ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, requestExecutor,
+ timeout -> LOG.warn("{}: requestExecutor shutdown timeout in {}",
this, timeout));
+ ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, writeExecutor,
+ timeout -> LOG.warn("{}: writeExecutor shutdown timeout in {}", this,
timeout));
+ }
+
private CompletableFuture<DataStream> stream(RaftClientRequest request,
StateMachine stateMachine) {
final RequestMetrics metrics =
getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
final Timekeeper.Context context = metrics.start();
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index b0acf2580..c5f24b058 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -301,20 +301,31 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void close() {
+ try {
+ proxies.close();
+ } catch (Exception e) {
+ LOG.error(this + ": Failed to close proxies.", e);
+ }
+
+ try {
+ requests.shutdown();
+ } catch (Exception e) {
+ LOG.error(this + ": Failed to shutdown request service.", e);
+ }
+
try {
channelFuture.channel().close().sync();
bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, bossGroup,
- timeout -> LOG.warn("{}: bossGroup shutdown timeout in " + timeout,
this));
+ timeout -> LOG.warn("{}: bossGroup shutdown timeout in {}", this,
timeout));
ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, workerGroup,
- timeout -> LOG.warn("{}: workerGroup shutdown timeout in " +
timeout, this));
+ timeout -> LOG.warn("{}: workerGroup shutdown timeout in {}", this,
timeout));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error(this + ": Interrupted close()", e);
}
- proxies.close();
}
@Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f54e7db53..1d45c6821 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -108,13 +108,18 @@ public interface RaftTestUtil {
exception.set(ise);
};
- final RaftServer.Division leader = JavaUtils.attemptRepeatedly(() -> {
- final RaftServer.Division l = cluster.getLeader(groupId,
handleNoLeaders, handleMultipleLeaders);
- if (l != null && !l.getInfo().isLeaderReady()) {
- throw new IllegalStateException("Leader: "+ l.getMemberId() + " not
ready");
+ final RaftServer.Division leader = JavaUtils.attempt(i -> {
+ try {
+ final RaftServer.Division l = cluster.getLeader(groupId,
handleNoLeaders, handleMultipleLeaders);
+ if (l != null && !l.getInfo().isLeaderReady()) {
+ throw new IllegalStateException("Leader: " + l.getMemberId() + " not
ready");
+ }
+ return l;
+ } catch (Exception e) {
+ LOG.warn("Attempt #{} failed: " + e, i);
+ throw e;
}
- return l;
- }, numAttempts, sleepTime, name, LOG);
+ }, numAttempts, sleepTime, () -> name, null);
LOG.info(cluster.printServers(groupId));
if (expectLeader) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 86b03a2da..8c315070e 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.datastream;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.netty.client.NettyClientStreamRpc;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
@@ -31,9 +31,12 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedBiFunction;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.event.Level;
import java.io.IOException;
import java.util.ArrayList;
@@ -53,6 +56,18 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER
extends MiniRaftCluste
return 300;
}
+ @Test
+ public void testSingleStreamsMultipleServers() throws Exception {
+ Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.TRACE);
+ try {
+ runWithNewCluster(3,
+ cluster -> runTestDataStream(cluster, false,
+ (c, stepDownLeader) -> runTestDataStream(c, 1, 1, 1_000, 3,
stepDownLeader)));
+ } finally {
+ Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.INFO);
+ }
+ }
+
@Test
public void testMultipleStreamsSingleServer() throws Exception {
runWithNewCluster(1, this::runTestDataStream);
@@ -79,34 +94,37 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER
extends MiniRaftCluste
}
void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
- runTestDataStream(cluster, true);
+ runMultipleStreams(cluster, true);
}
void runTestDataStream(CLUSTER cluster) throws Exception {
- runTestDataStream(cluster, false);
+ runTestDataStream(cluster, false, this::runMultipleStreams);
}
- void runTestDataStream(CLUSTER cluster, boolean stepDownLeader) throws
Exception {
- RaftTestUtil.waitForLeader(cluster);
-
+ long runMultipleStreams(CLUSTER cluster, boolean stepDownLeader) {
final List<CompletableFuture<Long>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
5, 10, 100_000, 10, stepDownLeader), executor));
futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
2, 20, 1_000, 5_000, stepDownLeader), executor));
- final long maxIndex = futures.stream()
+ return futures.stream()
.map(CompletableFuture::join)
.max(Long::compareTo)
.orElseThrow(IllegalStateException::new);
+ }
+
+ void runTestDataStream(CLUSTER cluster, boolean stepDownLeader,
CheckedBiFunction<CLUSTER, Boolean, Long, Exception> runMethod) throws
Exception {
+ RaftTestUtil.waitForLeader(cluster);
+
+ final long maxIndex = runMethod.apply(cluster, stepDownLeader);
if (stepDownLeader) {
final RaftPeerId oldLeader = cluster.getLeader().getId();
- final CompletableFuture<RaftPeerId> changeLeader =
futures.get(0).thenApplyAsync(dummy -> {
- try {
- return RaftTestUtil.changeLeader(cluster, oldLeader);
- } catch (Exception e) {
- throw new CompletionException("Failed to change leader from " +
oldLeader, e);
- }
- });
- LOG.info("Changed leader from {} to {}", oldLeader, changeLeader.join());
+ final RaftPeerId changed;
+ try {
+ changed = RaftTestUtil.changeLeader(cluster, oldLeader);
+ } catch (Exception e) {
+ throw new CompletionException("Failed to change leader from " +
oldLeader, e);
+ }
+ LOG.info("Changed leader from {} to {}", oldLeader, changed);
}
// wait for all servers to catch up
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 959cb3fc0..352d98e65 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -170,7 +171,7 @@ public abstract class DataStreamClusterTests<CLUSTER
extends MiniRaftCluster> ex
return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
@Override
public void accept(DataStreamOutputImpl out) {
- final DataStreamReply dataStreamReply = out.writeAsync(f).join();
+ final DataStreamReply dataStreamReply = out.writeAsync(f,
StandardWriteOption.FLUSH).join();
DataStreamTestUtils.assertSuccessReply(Type.STREAM_DATA, size,
dataStreamReply);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 7666bacd9..738cb0359 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -282,7 +282,8 @@ public interface DataStreamTestUtils {
sizes.add(size);
final ByteBuffer bf = initBuffer(dataSize, size);
- futures.add(i == bufferNum - 1 ? out.writeAsync(bf,
StandardWriteOption.SYNC) : out.writeAsync(bf));
+ futures.add(i == bufferNum - 1 ? out.writeAsync(bf,
StandardWriteOption.FLUSH, StandardWriteOption.SYNC)
+ : out.writeAsync(bf));
dataSize += size;
}