This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit d8cf2bd89fbbf3c32f298267f24bd8daeecb609b Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Oct 19 17:06:57 2022 +0800 RATIS-1157. Buffer packets when the size of packets are too small. (#748) (cherry picked from commit cf0dce981cb3dccb0501e7648f348b6f8634c1f2) --- .../apache/ratis/client/RaftClientConfigKeys.java | 22 +++++++- .../ratis/client/impl/DataStreamClientImpl.java | 5 -- .../ratis/client/impl/OrderedStreamAsync.java | 2 +- .../impl/DataStreamRequestByteBuffer.java | 10 ++-- .../impl/DataStreamRequestFilePositionCount.java | 8 +-- .../org/apache/ratis/io/StandardWriteOption.java | 4 +- .../main/java/org/apache/ratis/io/WriteOption.java | 4 +- .../apache/ratis/protocol/DataStreamRequest.java | 8 ++- .../ratis/protocol/DataStreamRequestHeader.java | 2 +- .../java/org/apache/ratis/util/SizeInBytes.java | 1 + .../apache/ratis/netty/NettyDataStreamUtils.java | 11 +++- .../ratis/netty/client/NettyClientStreamRpc.java | 66 +++++++++++++++++++++- .../ratis/netty/server/DataStreamManagement.java | 10 +--- .../netty/server/DataStreamRequestByteBuf.java | 8 +-- .../datastream/DataStreamAsyncClusterTests.java | 2 +- ...NettyDataStreamStarTopologyWithGrpcCluster.java | 22 ++++++-- 16 files changed, 142 insertions(+), 43 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index 0e950ee2e..d52fcab4f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -95,7 +95,7 @@ public interface RaftClientConfigKeys { String PREFIX = RaftClientConfigKeys.PREFIX + ".data-stream"; String OUTSTANDING_REQUESTS_MAX_KEY = PREFIX + ".outstanding-requests.max"; - int OUTSTANDING_REQUESTS_MAX_DEFAULT = 10; + int OUTSTANDING_REQUESTS_MAX_DEFAULT = 100; static int outstandingRequestsMax(RaftProperties properties) { return getInt(properties::getInt, OUTSTANDING_REQUESTS_MAX_KEY, OUTSTANDING_REQUESTS_MAX_DEFAULT, getDefaultLog(), requireMin(2)); @@ -104,6 +104,26 @@ public interface RaftClientConfigKeys { setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY, outstandingRequests); } + String FLUSH_REQUEST_COUNT_MIN_KEY = PREFIX + ".flush.request.count.min"; + int FLUSH_REQUEST_COUNT_MIN_DEFAULT = 0; + static int flushRequestCountMin(RaftProperties properties) { + return getInt(properties::getInt, FLUSH_REQUEST_COUNT_MIN_KEY, + FLUSH_REQUEST_COUNT_MIN_DEFAULT, getDefaultLog(), requireMin(0)); + } + static void setFlushRequestCountMin(RaftProperties properties, int flushRequestCountMin) { + setInt(properties::setInt, FLUSH_REQUEST_COUNT_MIN_KEY, flushRequestCountMin); + } + + String FLUSH_REQUEST_BYTES_MIN_KEY = PREFIX + ".flush.request.bytes.min"; + SizeInBytes FLUSH_REQUEST_BYTES_MIN_DEFAULT = SizeInBytes.ONE_MB; + static SizeInBytes flushRequestBytesMin(RaftProperties properties) { + return getSizeInBytes(properties::getSizeInBytes, FLUSH_REQUEST_COUNT_MIN_KEY, + FLUSH_REQUEST_BYTES_MIN_DEFAULT, getDefaultLog(), requireMinSizeInByte(SizeInBytes.ZERO)); + } + static void setFlushRequestBytesMin(RaftProperties properties, SizeInBytes flushRequestBytesMin) { + setSizeInBytes(properties::set, FLUSH_REQUEST_BYTES_MIN_KEY, flushRequestBytesMin); + } + String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout"; TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(10000, TimeUnit.MILLISECONDS); static TimeDuration requestTimeout(RaftProperties properties) { diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java index b944d25e5..44b346978 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java @@ -136,11 +136,6 @@ public class DataStreamClientImpl implements DataStreamClient { return f; } - @Override - public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) { - return writeAsync(src, Arrays.asList(options)); - } - @Override public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) { return writeAsyncImpl(src, src.remaining(), options); diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java index 8683a7f20..0515b146a 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java @@ -148,7 +148,7 @@ public class OrderedStreamAsync { request.getDataStreamRequest()); long seqNum = request.getSeqNum(); - final boolean isClose = StandardWriteOption.CLOSE.isOneOf(request.getDataStreamRequest().getWriteOptions()); + final boolean isClose = request.getDataStreamRequest().getWriteOptionList().contains(StandardWriteOption.CLOSE); scheduleWithTimeout(request, isClose? closeTimeout: requestTimeout); requestFuture.thenApply(reply -> { diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java index 282b4f928..938ed793b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java +++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java @@ -20,29 +20,27 @@ package org.apache.ratis.datastream.impl; import org.apache.ratis.io.WriteOption; import org.apache.ratis.protocol.DataStreamRequest; import org.apache.ratis.protocol.DataStreamRequestHeader; -import org.apache.ratis.thirdparty.com.google.common.collect.Lists; import org.apache.ratis.util.Preconditions; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; /** * Implements {@link DataStreamRequest} with {@link ByteBuffer}. - * + * <p> * This class is immutable. */ public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest { - private List<WriteOption> options; + private final List<WriteOption> options; public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) { super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), buffer); - this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions())); + this.options = header.getWriteOptionList(); Preconditions.assertTrue(header.getDataLength() == buffer.remaining()); } @Override - public List<WriteOption> getWriteOptions() { + public List<WriteOption> getWriteOptionList() { return options; } } diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java index b1fb73620..ceb84ff6e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java +++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java @@ -21,14 +21,12 @@ import org.apache.ratis.io.FilePositionCount; import org.apache.ratis.io.WriteOption; import org.apache.ratis.protocol.DataStreamRequest; import org.apache.ratis.protocol.DataStreamRequestHeader; -import org.apache.ratis.thirdparty.com.google.common.collect.Lists; -import java.util.Collections; import java.util.List; /** * Implements {@link DataStreamRequest} with {@link FilePositionCount}. - * + * <p> * This class is immutable. */ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest { @@ -37,7 +35,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) { super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset()); - this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions())); + this.options = header.getWriteOptionList(); this.file = file; } @@ -52,7 +50,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp } @Override - public List<WriteOption> getWriteOptions() { + public List<WriteOption> getWriteOptionList() { return options; } } diff --git a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java index 0aae8f9b9..27c91a5d8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java +++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java @@ -21,5 +21,7 @@ public enum StandardWriteOption implements WriteOption { /** Sync the data to the underlying storage. */ SYNC, /** Close the data to the underlying storage. */ - CLOSE + CLOSE, + /** Flush the data out to the network. */ + FLUSH, } diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java index b5f29a2f2..a734f0dab 100644 --- a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java +++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java @@ -20,6 +20,8 @@ package org.apache.ratis.io; import java.util.Arrays; public interface WriteOption { + WriteOption[] EMPTY_ARRAY = {}; + static boolean containsOption(Iterable<WriteOption> options, WriteOption target) { for (WriteOption option : options) { @@ -36,7 +38,7 @@ public interface WriteOption { return containsOption(Arrays.asList(options), target); } - default boolean isOneOf(Iterable<WriteOption> options) { + default boolean isOneOf(WriteOption... options) { return containsOption(options, this); } } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java index cde07c415..36d0d8b63 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java @@ -22,5 +22,11 @@ import org.apache.ratis.io.WriteOption; import java.util.List; public interface DataStreamRequest extends DataStreamPacket { - List<WriteOption> getWriteOptions(); + List<WriteOption> getWriteOptionList(); + + /** @deprecated use {@link #getWriteOptionList()}. */ + @Deprecated + default WriteOption[] getWriteOptions() { + return getWriteOptionList().toArray(WriteOption.EMPTY_ARRAY); + } } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java index a0c68eff1..7cbf17ea6 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java @@ -45,7 +45,7 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D } @Override - public List<WriteOption> getWriteOptions() { + public List<WriteOption> getWriteOptionList() { return options; } } \ No newline at end of file diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java index 25667a378..683f0da62 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java @@ -23,6 +23,7 @@ import java.util.Objects; * Size which may be constructed with a {@link TraditionalBinaryPrefix}. */ public final class SizeInBytes { + public static final SizeInBytes ZERO = valueOf(0); public static final SizeInBytes ONE_KB = valueOf("1k"); public static final SizeInBytes ONE_MB = valueOf("1m"); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java index 0f595c7b4..10f35157c 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java @@ -56,7 +56,7 @@ public interface NettyDataStreamUtils { .setStreamOffset(request.getStreamOffset()) .setType(request.getType()) .setDataLength(request.getDataLength()); - for (WriteOption option : request.getWriteOptions()) { + for (WriteOption option : request.getWriteOptionList()) { b.addOptions(DataStreamPacketHeaderProto.Option.forNumber( ((StandardWriteOption) option).ordinal())); } @@ -105,7 +105,14 @@ public interface NettyDataStreamUtils { static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<Object> out, ByteBufAllocator allocator) { encodeDataStreamRequestHeader(request, out, allocator); - out.accept(Unpooled.wrappedBuffer(request.slice())); + encodeByteBuffer(request.slice(), out); + } + + static void encodeByteBuffer(ByteBuffer buffer, Consumer<Object> out) { + if (buffer.remaining() == 0) { + return; + } + out.accept(Unpooled.wrappedBuffer(buffer)); } static void encodeDataStreamRequestFilePositionCount( diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index a03013748..51326d13e 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -19,9 +19,12 @@ package org.apache.ratis.netty.client; import org.apache.ratis.client.DataStreamClientRpc; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer; import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.io.WriteOption; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyDataStreamUtils; import org.apache.ratis.netty.NettyUtils; @@ -51,6 +54,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; @@ -58,6 +62,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -68,8 +73,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER; + public class NettyClientStreamRpc implements DataStreamClientRpc { public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class); @@ -238,9 +246,39 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { } } + class OutstandingRequests { + private int count; + private long bytes; + + synchronized boolean write(DataStreamRequest request) { + count++; + bytes += request.getDataLength(); + final List<WriteOption> options = request.getWriteOptionList(); + final boolean isClose = options.contains(StandardWriteOption.CLOSE); + final boolean isFlush = options.contains(StandardWriteOption.FLUSH); + final boolean flush = shouldFlush(isClose || isFlush, flushRequestCountMin, flushRequestBytesMin); + LOG.debug("Stream{} outstanding: count={}, bytes={}, options={}, flush? {}", + request.getStreamId(), count, bytes, options, flush); + return flush; + } + + synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes bytesMin) { + if (force || count >= countMin || bytes >= bytesMin.getSize()) { + count = 0; + bytes = 0; + return true; + } + return false; + } + } + private final String name; private final Connection connection; + private final int flushRequestCountMin; + private final SizeInBytes flushRequestBytesMin; + private final OutstandingRequests outstandingRequests = new OutstandingRequests(); + private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>(); private final TimeDuration replyQueueGracePeriod; private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance(); @@ -248,6 +286,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) { this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server; this.replyQueueGracePeriod = NettyConfigKeys.DataStream.Client.replyQueueGracePeriod(properties); + this.flushRequestCountMin = RaftClientConfigKeys.DataStream.flushRequestCountMin(properties); + this.flushRequestBytesMin = RaftClientConfigKeys.DataStream.flushRequestBytesMin(properties); final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress()); final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf); @@ -307,7 +347,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { if (!connection.isClosed()) { connection.scheduleReconnect("channel is inactive", null); } @@ -326,6 +366,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { } p.addLast(newEncoder()); p.addLast(newEncoderDataStreamRequestFilePositionCount()); + p.addLast(newEncoderByteBuffer()); p.addLast(newDecoder()); p.addLast(handler); } @@ -350,6 +391,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { }; } + static MessageToMessageEncoder<ByteBuffer> newEncoderByteBuffer() { + return new MessageToMessageEncoder<ByteBuffer>() { + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuffer request, List<Object> out) { + NettyDataStreamUtils.encodeByteBuffer(request, out::add); + } + }; + } + static ByteToMessageDecoder newDecoder() { return new ByteToMessageDecoder() { { @@ -378,7 +428,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { return f; } LOG.debug("{}: write {}", this, request); - channel.writeAndFlush(request).addListener(future -> { + final Function<DataStreamRequest, ChannelFuture> writeMethod = outstandingRequests.write(request)? + channel::writeAndFlush: channel::write; + writeMethod.apply(request).addListener(future -> { if (!future.isSuccess()) { final IOException e = new IOException(this + ": Failed to send " + request, future.cause()); LOG.error("Channel write failed", e); @@ -390,7 +442,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { @Override public void close() { - connection.close(); + final boolean flush = outstandingRequests.shouldFlush(true, 0, SizeInBytes.ZERO); + LOG.debug("flush? {}", flush); + if (flush) { + Optional.ofNullable(connection.getChannelUninterruptibly()) + .map(c -> c.writeAndFlush(EMPTY_BYTE_BUFFER)) + .ifPresent(f -> f.addListener(dummy -> connection.close())); + } else { + connection.close(); + } } @Override diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 3a8f0fcba..27e7eb99b 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -114,8 +114,7 @@ public class DataStreamManagement { CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) { final RequestContext context = metrics.start(); return composeAsync(sendFuture, executor, - n -> out.writeAsync(request.slice().nioBuffer(), - request.getWriteOptions()) + n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptionList()) .whenComplete((l, e) -> metrics.stop(context, e == null))); } } @@ -419,8 +418,7 @@ public class DataStreamManagement { private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) { - boolean close = WriteOption.containsOption(request.getWriteOptions(), - StandardWriteOption.CLOSE); + final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE); ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId()); final StreamInfo info; if (request.getType() == Type.STREAM_HEADER) { @@ -449,9 +447,7 @@ public class DataStreamManagement { localWrite = CompletableFuture.completedFuture(0L); remoteWrites = Collections.emptyList(); } else if (request.getType() == Type.STREAM_DATA) { - localWrite = info.getLocal().write(buf, - request.getWriteOptions(), - writeExecutor); + localWrite = info.getLocal().write(buf, request.getWriteOptionList(), writeExecutor); remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor)); } else { throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java index 84803eb0e..0dcd46e02 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java @@ -33,7 +33,7 @@ import java.util.List; /** * Implements {@link DataStreamRequest} with {@link ByteBuf}. - * + * <p> * This class is immutable. */ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest { @@ -41,7 +41,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da private final List<WriteOption> options; public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, - List<WriteOption> options, ByteBuf buf) { + Iterable<WriteOption> options, ByteBuf buf) { super(clientId, type, streamId, streamOffset); this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER; this.options = Collections.unmodifiableList(Lists.newArrayList(options)); @@ -49,7 +49,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) { this(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), - header.getWriteOptions(), buf); + header.getWriteOptionList(), buf); } @Override @@ -62,7 +62,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da } @Override - public List<WriteOption> getWriteOptions() { + public List<WriteOption> getWriteOptionList() { return options; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java index a9b691dbb..b77b9719c 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java @@ -90,7 +90,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste RaftTestUtil.waitForLeader(cluster); final List<CompletableFuture<Long>> futures = new ArrayList<>(); - futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10, stepDownLeader), executor)); + futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 100_000, 10, stepDownLeader), executor)); futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 5_000, stepDownLeader), executor)); final long maxIndex = futures.stream() .map(CompletableFuture::join) diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java index cd6bbc75f..14c62b74f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java @@ -17,9 +17,14 @@ */ package org.apache.ratis.datastream; +import org.apache.ratis.client.RaftClientConfigKeys; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RoutingTable; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.junit.Before; import java.util.Collection; import java.util.List; @@ -29,13 +34,22 @@ public class TestNettyDataStreamStarTopologyWithGrpcCluster extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet { + @Before + public void setup() { + final RaftProperties p = getProperties(); + RaftClientConfigKeys.DataStream.setRequestTimeout(p, TimeDuration.ONE_MINUTE); + RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4); + RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB")); + RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16); + } + @Override public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) { - RoutingTable.Builder builder = RoutingTable.newBuilder(); final List<RaftPeerId> others = peers.stream() - .filter(p -> !p.getId().equals(primary.getId())).map(v -> v.getId()) + .map(RaftPeer::getId).filter(id -> !id.equals(primary.getId())) .collect(Collectors.toList()); - builder.addSuccessors(primary.getId(), others); - return builder.build(); + return RoutingTable.newBuilder() + .addSuccessors(primary.getId(), others) + .build(); } }
