This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 33fcf16ef099fa108877559bde57655c8bb71a03 Author: Swaminathan Balachandran <[email protected]> AuthorDate: Mon Oct 3 22:45:17 2022 -0700 RATIS-1647. Fix EI_EXPOSE_REP in DataStreamRequest implementations (#716) (cherry picked from commit 289db3ae64e8cf620eba882468dda661af0439bc) --- .../apache/ratis/client/api/DataStreamOutput.java | 14 +++++++++++++- .../ratis/client/impl/DataStreamClientImpl.java | 17 ++++++++++++----- .../impl/DataStreamRequestByteBuffer.java | 11 ++++++----- .../impl/DataStreamRequestFilePositionCount.java | 12 +++++++----- .../main/java/org/apache/ratis/io/WriteOption.java | 12 ++++++++++-- .../apache/ratis/protocol/DataStreamRequest.java | 3 ++- .../ratis/protocol/DataStreamRequestHeader.java | 18 ++++++++++++------ .../ratis/netty/server/DataStreamManagement.java | 20 ++++++++++++++------ .../netty/server/DataStreamRequestByteBuf.java | 21 +++++++++++---------- 9 files changed, 87 insertions(+), 41 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java index be6d13ef5..cb5045927 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java @@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftClientReply; import java.io.File; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; /** An asynchronous output stream supporting zero buffer copying. */ @@ -37,7 +38,18 @@ public interface DataStreamOutput extends CloseAsync<DataStreamReply> { * @param options - options specifying how the data was written * @return a future of the reply. */ - CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options); + default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) { + return writeAsync(src, Arrays.asList(options)); + } + + /** + * Send out the data in the source buffer asynchronously. + * + * @param src the source buffer to be sent. + * @param options - options specifying how the data was written + * @return a future of the reply. + */ + CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options); /** diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java index 7e04d2f2c..b944d25e5 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java @@ -46,6 +46,8 @@ import org.apache.ratis.util.SlidingWindow; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -107,10 +109,10 @@ public class DataStreamClientImpl implements DataStreamClient { this.header = request; this.slidingWindow = new SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, header.getCallId())); final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header); - this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining()); + this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(), Collections.emptyList()); } - - private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) { + private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, + Iterable<WriteOption> options) { final DataStreamRequestHeader h = new DataStreamRequestHeader(header.getClientId(), type, header.getCallId(), streamOffset, length, options); return orderedStreamAsync.sendRequest(h, data, slidingWindow); @@ -120,7 +122,7 @@ public class DataStreamClientImpl implements DataStreamClient { return future.thenCombine(headerFuture, (reply, headerReply) -> headerReply.isSuccess()? reply : headerReply); } - private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, WriteOption... options) { + private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, Iterable<WriteOption> options) { if (isClosed()) { return JavaUtils.completeExceptionally(new AlreadyClosedException( clientId + ": stream already closed, request=" + header)); @@ -136,12 +138,17 @@ public class DataStreamClientImpl implements DataStreamClient { @Override public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) { + return writeAsync(src, Arrays.asList(options)); + } + + @Override + public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) { return writeAsyncImpl(src, src.remaining(), options); } @Override public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, WriteOption... options) { - return writeAsyncImpl(src, src.getCount(), options); + return writeAsyncImpl(src, src.getCount(), Arrays.asList(options)); } boolean isClosed() { diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java index 80949f071..282b4f928 100644 --- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java +++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java @@ -17,13 +17,15 @@ */ package org.apache.ratis.datastream.impl; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.io.WriteOption; import org.apache.ratis.protocol.DataStreamRequest; import org.apache.ratis.protocol.DataStreamRequestHeader; +import org.apache.ratis.thirdparty.com.google.common.collect.Lists; import org.apache.ratis.util.Preconditions; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; /** * Implements {@link DataStreamRequest} with {@link ByteBuffer}. @@ -31,17 +33,16 @@ import java.nio.ByteBuffer; * This class is immutable. */ public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest { - private WriteOption[] options; + private List<WriteOption> options; public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) { super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), buffer); - this.options = header.getWriteOptions(); + this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions())); Preconditions.assertTrue(header.getDataLength() == buffer.remaining()); } @Override - @SuppressFBWarnings("EI_EXPOSE_REP") - public WriteOption[] getWriteOptions() { + public List<WriteOption> getWriteOptions() { return options; } } diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java index 884145529..b1fb73620 100644 --- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java +++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java @@ -17,11 +17,14 @@ */ package org.apache.ratis.datastream.impl; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.io.FilePositionCount; import org.apache.ratis.io.WriteOption; import org.apache.ratis.protocol.DataStreamRequest; import org.apache.ratis.protocol.DataStreamRequestHeader; +import org.apache.ratis.thirdparty.com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; /** * Implements {@link DataStreamRequest} with {@link FilePositionCount}. @@ -30,11 +33,11 @@ import org.apache.ratis.protocol.DataStreamRequestHeader; */ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest { private final FilePositionCount file; - private WriteOption[] options; + private final List<WriteOption> options; public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) { super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset()); - this.options = header.getWriteOptions(); + this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions())); this.file = file; } @@ -49,8 +52,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp } @Override - @SuppressFBWarnings("EI_EXPOSE_REP") - public WriteOption[] getWriteOptions() { + public List<WriteOption> getWriteOptions() { return options; } } diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java index 276871495..b5f29a2f2 100644 --- a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java +++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java @@ -17,8 +17,11 @@ */ package org.apache.ratis.io; +import java.util.Arrays; + public interface WriteOption { - static boolean containsOption(WriteOption[] options, WriteOption target) { + static boolean containsOption(Iterable<WriteOption> options, + WriteOption target) { for (WriteOption option : options) { if (option == target) { return true; @@ -28,7 +31,12 @@ public interface WriteOption { return false; } - default boolean isOneOf(WriteOption... options) { + static boolean containsOption(WriteOption[] options, + WriteOption target) { + return containsOption(Arrays.asList(options), target); + } + + default boolean isOneOf(Iterable<WriteOption> options) { return containsOption(options, this); } } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java index cf0920876..cde07c415 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java @@ -19,7 +19,8 @@ package org.apache.ratis.protocol; import org.apache.ratis.io.WriteOption; +import java.util.List; public interface DataStreamRequest extends DataStreamPacket { - WriteOption[] getWriteOptions(); + List<WriteOption> getWriteOptions(); } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java index f2fbc159b..a0c68eff1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java @@ -18,28 +18,34 @@ package org.apache.ratis.protocol; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.io.WriteOption; import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type; +import org.apache.ratis.thirdparty.com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; /** * The header format is the same {@link DataStreamPacketHeader} * since there are no additional fields. */ public class DataStreamRequestHeader extends DataStreamPacketHeader implements DataStreamRequest { - private final WriteOption[] options; + private final List<WriteOption> options; - @SuppressFBWarnings("EI_EXPOSE_REP2") public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength, WriteOption... options) { + this(clientId, type, streamId, streamOffset, dataLength, Arrays.asList(options)); + } + + public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength, + Iterable<WriteOption> options) { super(clientId, type, streamId, streamOffset, dataLength); - this.options = options; + this.options = Collections.unmodifiableList(Lists.newArrayList(options)); } @Override - @SuppressFBWarnings("EI_EXPOSE_REP") - public WriteOption[] getWriteOptions() { + public List<WriteOption> getWriteOptions() { return options; } } \ No newline at end of file diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 65e25f4dc..3a8f0fcba 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -91,7 +91,8 @@ public class DataStreamManagement { this.metrics = metrics; } - CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor executor) { + CompletableFuture<Long> write(ByteBuf buf, Iterable<WriteOption> options, + Executor executor) { final RequestContext context = metrics.start(); return composeAsync(writeFuture, executor, n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, stream, executor) @@ -113,7 +114,8 @@ public class DataStreamManagement { CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) { final RequestContext context = metrics.start(); return composeAsync(sendFuture, executor, - n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions()) + n -> out.writeAsync(request.slice().nioBuffer(), + request.getWriteOptions()) .whenComplete((l, e) -> metrics.stop(context, e == null))); } } @@ -283,13 +285,16 @@ public class DataStreamManagement { return future.updateAndGet(previous -> previous.thenComposeAsync(function, executor)); } - static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[] options, DataStream stream, + static CompletableFuture<Long> writeToAsync(ByteBuf buf, + Iterable<WriteOption> options, + DataStream stream, Executor defaultExecutor) { final Executor e = Optional.ofNullable(stream.getExecutor()).orElse(defaultExecutor); return CompletableFuture.supplyAsync(() -> writeTo(buf, options, stream), e); } - static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) { + static long writeTo(ByteBuf buf, Iterable<WriteOption> options, + DataStream stream) { final DataChannel channel = stream.getDataChannel(); long byteWritten = 0; for (ByteBuffer buffer : buf.nioBuffers()) { @@ -414,7 +419,8 @@ public class DataStreamManagement { private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) { - boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE); + boolean close = WriteOption.containsOption(request.getWriteOptions(), + StandardWriteOption.CLOSE); ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId()); final StreamInfo info; if (request.getType() == Type.STREAM_HEADER) { @@ -443,7 +449,9 @@ public class DataStreamManagement { localWrite = CompletableFuture.completedFuture(0L); remoteWrites = Collections.emptyList(); } else if (request.getType() == Type.STREAM_DATA) { - localWrite = info.getLocal().write(buf, request.getWriteOptions(), writeExecutor); + localWrite = info.getLocal().write(buf, + request.getWriteOptions(), + writeExecutor); remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor)); } else { throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java index 29bda7094..84803eb0e 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java @@ -18,16 +18,19 @@ package org.apache.ratis.netty.server; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.datastream.impl.DataStreamPacketImpl; import org.apache.ratis.io.WriteOption; +import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.DataStreamRequest; import org.apache.ratis.protocol.DataStreamRequestHeader; -import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type; +import org.apache.ratis.thirdparty.com.google.common.collect.Lists; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import java.util.Collections; +import java.util.List; + /** * Implements {@link DataStreamRequest} with {@link ByteBuf}. * @@ -35,19 +38,18 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; */ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest { private final ByteBuf buf; - private final WriteOption[] options; + private final List<WriteOption> options; - @SuppressFBWarnings("EI_EXPOSE_REP2") - public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, WriteOption[] options, - ByteBuf buf) { + public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, + List<WriteOption> options, ByteBuf buf) { super(clientId, type, streamId, streamOffset); this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER; - this.options = options; + this.options = Collections.unmodifiableList(Lists.newArrayList(options)); } public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) { this(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), - header.getWriteOptions(), buf); + header.getWriteOptions(), buf); } @Override @@ -60,8 +62,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da } @Override - @SuppressFBWarnings("EI_EXPOSE_REP") - public WriteOption[] getWriteOptions() { + public List<WriteOption> getWriteOptions() { return options; } }
