This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 289db3ae6 RATIS-1647. Fix EI_EXPOSE_REP in DataStreamRequest
implementations (#716)
289db3ae6 is described below
commit 289db3ae64e8cf620eba882468dda661af0439bc
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Oct 3 22:45:17 2022 -0700
RATIS-1647. Fix EI_EXPOSE_REP in DataStreamRequest implementations (#716)
---
.../apache/ratis/client/api/DataStreamOutput.java | 14 +++++++++++++-
.../ratis/client/impl/DataStreamClientImpl.java | 17 ++++++++++++-----
.../impl/DataStreamRequestByteBuffer.java | 11 ++++++-----
.../impl/DataStreamRequestFilePositionCount.java | 12 +++++++-----
.../main/java/org/apache/ratis/io/WriteOption.java | 12 ++++++++++--
.../apache/ratis/protocol/DataStreamRequest.java | 3 ++-
.../ratis/protocol/DataStreamRequestHeader.java | 18 ++++++++++++------
.../ratis/netty/server/DataStreamManagement.java | 20 ++++++++++++++------
.../netty/server/DataStreamRequestByteBuf.java | 21 +++++++++++----------
9 files changed, 87 insertions(+), 41 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index be6d13ef5..cb5045927 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
@@ -37,7 +38,18 @@ public interface DataStreamOutput extends
CloseAsync<DataStreamReply> {
* @param options - options specifying how the data was written
* @return a future of the reply.
*/
- CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption...
options);
+ default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
WriteOption... options) {
+ return writeAsync(src, Arrays.asList(options));
+ }
+
+ /**
+ * Send out the data in the source buffer asynchronously.
+ *
+ * @param src the source buffer to be sent.
+ * @param options - options specifying how the data was written
+ * @return a future of the reply.
+ */
+ CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
Iterable<WriteOption> options);
/**
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 50f5ef4a0..cd1e1f256 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -49,6 +49,8 @@ import org.apache.ratis.util.SlidingWindow;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -123,10 +125,10 @@ public class DataStreamClientImpl implements
DataStreamClient {
this.header = request;
this.slidingWindow = new
SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId,
header.getCallId()));
final ByteBuffer buffer =
ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
- this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
+ this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(),
Collections.emptyList());
}
-
- private CompletableFuture<DataStreamReply> send(Type type, Object data,
long length, WriteOption... options) {
+ private CompletableFuture<DataStreamReply> send(Type type, Object data,
long length,
+ Iterable<WriteOption>
options) {
final DataStreamRequestHeader h =
new DataStreamRequestHeader(header.getClientId(), type,
header.getCallId(), streamOffset, length, options);
return orderedStreamAsync.sendRequest(h, data, slidingWindow);
@@ -136,7 +138,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
return future.thenCombine(headerFuture, (reply, headerReply) ->
headerReply.isSuccess()? reply : headerReply);
}
- private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data,
long length, WriteOption... options) {
+ private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data,
long length, Iterable<WriteOption> options) {
if (isClosed()) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
@@ -153,12 +155,17 @@ public class DataStreamClientImpl implements
DataStreamClient {
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
WriteOption... options) {
+ return writeAsync(src, Arrays.asList(options));
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src,
Iterable<WriteOption> options) {
return writeAsyncImpl(src, src.remaining(), options);
}
@Override
public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount
src, WriteOption... options) {
- return writeAsyncImpl(src, src.getCount(), options);
+ return writeAsyncImpl(src, src.getCount(), Arrays.asList(options));
}
boolean isClosed() {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 80949f071..282b4f928 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -17,13 +17,15 @@
*/
package org.apache.ratis.datastream.impl;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
import org.apache.ratis.util.Preconditions;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
/**
* Implements {@link DataStreamRequest} with {@link ByteBuffer}.
@@ -31,17 +33,16 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer
implements DataStreamRequest {
- private WriteOption[] options;
+ private List<WriteOption> options;
public DataStreamRequestByteBuffer(DataStreamRequestHeader header,
ByteBuffer buffer) {
super(header.getClientId(), header.getType(), header.getStreamId(),
header.getStreamOffset(), buffer);
- this.options = header.getWriteOptions();
+ this.options =
Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
}
@Override
- @SuppressFBWarnings("EI_EXPOSE_REP")
- public WriteOption[] getWriteOptions() {
+ public List<WriteOption> getWriteOptions() {
return options;
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index 884145529..b1fb73620 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -17,11 +17,14 @@
*/
package org.apache.ratis.datastream.impl;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
/**
* Implements {@link DataStreamRequest} with {@link FilePositionCount}.
@@ -30,11 +33,11 @@ import org.apache.ratis.protocol.DataStreamRequestHeader;
*/
public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl
implements DataStreamRequest {
private final FilePositionCount file;
- private WriteOption[] options;
+ private final List<WriteOption> options;
public DataStreamRequestFilePositionCount(DataStreamRequestHeader header,
FilePositionCount file) {
super(header.getClientId(), header.getType(), header.getStreamId(),
header.getStreamOffset());
- this.options = header.getWriteOptions();
+ this.options =
Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
this.file = file;
}
@@ -49,8 +52,7 @@ public class DataStreamRequestFilePositionCount extends
DataStreamPacketImpl imp
}
@Override
- @SuppressFBWarnings("EI_EXPOSE_REP")
- public WriteOption[] getWriteOptions() {
+ public List<WriteOption> getWriteOptions() {
return options;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
index 276871495..b5f29a2f2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -17,8 +17,11 @@
*/
package org.apache.ratis.io;
+import java.util.Arrays;
+
public interface WriteOption {
- static boolean containsOption(WriteOption[] options, WriteOption target) {
+ static boolean containsOption(Iterable<WriteOption> options,
+ WriteOption target) {
for (WriteOption option : options) {
if (option == target) {
return true;
@@ -28,7 +31,12 @@ public interface WriteOption {
return false;
}
- default boolean isOneOf(WriteOption... options) {
+ static boolean containsOption(WriteOption[] options,
+ WriteOption target) {
+ return containsOption(Arrays.asList(options), target);
+ }
+
+ default boolean isOneOf(Iterable<WriteOption> options) {
return containsOption(options, this);
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index cf0920876..cde07c415 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -19,7 +19,8 @@
package org.apache.ratis.protocol;
import org.apache.ratis.io.WriteOption;
+import java.util.List;
public interface DataStreamRequest extends DataStreamPacket {
- WriteOption[] getWriteOptions();
+ List<WriteOption> getWriteOptions();
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index f2fbc159b..a0c68eff1 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,28 +18,34 @@
package org.apache.ratis.protocol;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
/**
* The header format is the same {@link DataStreamPacketHeader}
* since there are no additional fields.
*/
public class DataStreamRequestHeader extends DataStreamPacketHeader implements
DataStreamRequest {
- private final WriteOption[] options;
+ private final List<WriteOption> options;
- @SuppressFBWarnings("EI_EXPOSE_REP2")
public DataStreamRequestHeader(ClientId clientId, Type type, long streamId,
long streamOffset, long dataLength,
WriteOption... options) {
+ this(clientId, type, streamId, streamOffset, dataLength,
Arrays.asList(options));
+ }
+
+ public DataStreamRequestHeader(ClientId clientId, Type type, long streamId,
long streamOffset, long dataLength,
+ Iterable<WriteOption> options) {
super(clientId, type, streamId, streamOffset, dataLength);
- this.options = options;
+ this.options = Collections.unmodifiableList(Lists.newArrayList(options));
}
@Override
- @SuppressFBWarnings("EI_EXPOSE_REP")
- public WriteOption[] getWriteOptions() {
+ public List<WriteOption> getWriteOptions() {
return options;
}
}
\ No newline at end of file
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 96266789b..2f9aa695e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -90,7 +90,8 @@ public class DataStreamManagement {
this.metrics = metrics;
}
- CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor
executor) {
+ CompletableFuture<Long> write(ByteBuf buf, Iterable<WriteOption> options,
+ Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(writeFuture, executor,
n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options,
stream, executor)
@@ -112,7 +113,8 @@ public class DataStreamManagement {
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request,
Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(sendFuture, executor,
- n -> out.writeAsync(request.slice().nioBuffer(),
request.getWriteOptions())
+ n -> out.writeAsync(request.slice().nioBuffer(),
+ request.getWriteOptions())
.whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
@@ -282,13 +284,16 @@ public class DataStreamManagement {
return future.updateAndGet(previous -> previous.thenComposeAsync(function,
executor));
}
- static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[]
options, DataStream stream,
+ static CompletableFuture<Long> writeToAsync(ByteBuf buf,
+ Iterable<WriteOption> options,
+ DataStream stream,
Executor defaultExecutor) {
final Executor e =
Optional.ofNullable(stream.getExecutor()).orElse(defaultExecutor);
return CompletableFuture.supplyAsync(() -> writeTo(buf, options, stream),
e);
}
- static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) {
+ static long writeTo(ByteBuf buf, Iterable<WriteOption> options,
+ DataStream stream) {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
@@ -389,7 +394,8 @@ public class DataStreamManagement {
private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx, ByteBuf buf,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
- boolean close = WriteOption.containsOption(request.getWriteOptions(),
StandardWriteOption.CLOSE);
+ boolean close = WriteOption.containsOption(request.getWriteOptions(),
+ StandardWriteOption.CLOSE);
ClientInvocationId key =
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
@@ -418,7 +424,9 @@ public class DataStreamManagement {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
- localWrite = info.getLocal().write(buf, request.getWriteOptions(),
writeExecutor);
+ localWrite = info.getLocal().write(buf,
+ request.getWriteOptions(),
+ writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request,
requestExecutor));
} else {
throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 29bda7094..84803eb0e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -18,16 +18,19 @@
package org.apache.ratis.netty.server;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import java.util.Collections;
+import java.util.List;
+
/**
* Implements {@link DataStreamRequest} with {@link ByteBuf}.
*
@@ -35,19 +38,18 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
*/
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements
DataStreamRequest {
private final ByteBuf buf;
- private final WriteOption[] options;
+ private final List<WriteOption> options;
- @SuppressFBWarnings("EI_EXPOSE_REP2")
- public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId,
long streamOffset, WriteOption[] options,
- ByteBuf buf) {
+ public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId,
long streamOffset,
+ List<WriteOption> options, ByteBuf buf) {
super(clientId, type, streamId, streamOffset);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
- this.options = options;
+ this.options = Collections.unmodifiableList(Lists.newArrayList(options));
}
public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf)
{
this(header.getClientId(), header.getType(), header.getStreamId(),
header.getStreamOffset(),
- header.getWriteOptions(), buf);
+ header.getWriteOptions(), buf);
}
@Override
@@ -60,8 +62,7 @@ public class DataStreamRequestByteBuf extends
DataStreamPacketImpl implements Da
}
@Override
- @SuppressFBWarnings("EI_EXPOSE_REP")
- public WriteOption[] getWriteOptions() {
+ public List<WriteOption> getWriteOptions() {
return options;
}
}