This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 289db3ae6 RATIS-1647. Fix EI_EXPOSE_REP in DataStreamRequest 
implementations (#716)
289db3ae6 is described below

commit 289db3ae64e8cf620eba882468dda661af0439bc
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Oct 3 22:45:17 2022 -0700

    RATIS-1647. Fix EI_EXPOSE_REP in DataStreamRequest implementations (#716)
---
 .../apache/ratis/client/api/DataStreamOutput.java   | 14 +++++++++++++-
 .../ratis/client/impl/DataStreamClientImpl.java     | 17 ++++++++++++-----
 .../impl/DataStreamRequestByteBuffer.java           | 11 ++++++-----
 .../impl/DataStreamRequestFilePositionCount.java    | 12 +++++++-----
 .../main/java/org/apache/ratis/io/WriteOption.java  | 12 ++++++++++--
 .../apache/ratis/protocol/DataStreamRequest.java    |  3 ++-
 .../ratis/protocol/DataStreamRequestHeader.java     | 18 ++++++++++++------
 .../ratis/netty/server/DataStreamManagement.java    | 20 ++++++++++++++------
 .../netty/server/DataStreamRequestByteBuf.java      | 21 +++++++++++----------
 9 files changed, 87 insertions(+), 41 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index be6d13ef5..cb5045927 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 
 /** An asynchronous output stream supporting zero buffer copying. */
@@ -37,7 +38,18 @@ public interface DataStreamOutput extends 
CloseAsync<DataStreamReply> {
    * @param options - options specifying how the data was written
    * @return a future of the reply.
    */
-  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... 
options);
+  default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, 
WriteOption... options) {
+    return writeAsync(src, Arrays.asList(options));
+  }
+
+  /**
+   * Send out the data in the source buffer asynchronously.
+   *
+   * @param src the source buffer to be sent.
+   * @param options - options specifying how the data was written
+   * @return a future of the reply.
+   */
+  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, 
Iterable<WriteOption> options);
 
 
   /**
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 50f5ef4a0..cd1e1f256 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -49,6 +49,8 @@ import org.apache.ratis.util.SlidingWindow;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -123,10 +125,10 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       this.header = request;
       this.slidingWindow = new 
SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, 
header.getCallId()));
       final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
-      this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
+      this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(), 
Collections.emptyList());
     }
-
-    private CompletableFuture<DataStreamReply> send(Type type, Object data, 
long length, WriteOption... options) {
+    private CompletableFuture<DataStreamReply> send(Type type, Object data, 
long length,
+                                                    Iterable<WriteOption> 
options) {
       final DataStreamRequestHeader h =
           new DataStreamRequestHeader(header.getClientId(), type, 
header.getCallId(), streamOffset, length, options);
       return orderedStreamAsync.sendRequest(h, data, slidingWindow);
@@ -136,7 +138,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       return future.thenCombine(headerFuture, (reply, headerReply) -> 
headerReply.isSuccess()? reply : headerReply);
     }
 
-    private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, 
long length, WriteOption... options) {
+    private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, 
long length, Iterable<WriteOption> options) {
       if (isClosed()) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             clientId + ": stream already closed, request=" + header));
@@ -153,12 +155,17 @@ public class DataStreamClientImpl implements 
DataStreamClient {
 
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, 
WriteOption... options) {
+      return writeAsync(src, Arrays.asList(options));
+    }
+
+    @Override
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, 
Iterable<WriteOption> options) {
       return writeAsyncImpl(src, src.remaining(), options);
     }
 
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount 
src, WriteOption... options) {
-      return writeAsyncImpl(src, src.getCount(), options);
+      return writeAsyncImpl(src, src.getCount(), Arrays.asList(options));
     }
 
     boolean isClosed() {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 80949f071..282b4f928 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -17,13 +17,15 @@
  */
 package org.apache.ratis.datastream.impl;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
 import org.apache.ratis.util.Preconditions;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
@@ -31,17 +33,16 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer 
implements DataStreamRequest {
-  private WriteOption[] options;
+  private List<WriteOption> options;
 
   public DataStreamRequestByteBuffer(DataStreamRequestHeader header, 
ByteBuffer buffer) {
     super(header.getClientId(), header.getType(), header.getStreamId(), 
header.getStreamOffset(), buffer);
-    this.options = header.getWriteOptions();
+    this.options = 
Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
     Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
   }
 
   @Override
-  @SuppressFBWarnings("EI_EXPOSE_REP")
-  public WriteOption[] getWriteOptions() {
+  public List<WriteOption> getWriteOptions() {
     return options;
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index 884145529..b1fb73620 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -17,11 +17,14 @@
  */
 package org.apache.ratis.datastream.impl;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Implements {@link DataStreamRequest} with {@link FilePositionCount}.
@@ -30,11 +33,11 @@ import org.apache.ratis.protocol.DataStreamRequestHeader;
  */
 public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl 
implements DataStreamRequest {
   private final FilePositionCount file;
-  private WriteOption[] options;
+  private final List<WriteOption> options;
 
   public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, 
FilePositionCount file) {
     super(header.getClientId(), header.getType(), header.getStreamId(), 
header.getStreamOffset());
-    this.options = header.getWriteOptions();
+    this.options = 
Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
     this.file = file;
   }
 
@@ -49,8 +52,7 @@ public class DataStreamRequestFilePositionCount extends 
DataStreamPacketImpl imp
   }
 
   @Override
-  @SuppressFBWarnings("EI_EXPOSE_REP")
-  public WriteOption[] getWriteOptions() {
+  public List<WriteOption> getWriteOptions() {
     return options;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java 
b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
index 276871495..b5f29a2f2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -17,8 +17,11 @@
  */
 package org.apache.ratis.io;
 
+import java.util.Arrays;
+
 public interface WriteOption {
-  static boolean containsOption(WriteOption[] options, WriteOption target) {
+  static boolean containsOption(Iterable<WriteOption> options,
+                                WriteOption target) {
     for (WriteOption option : options) {
       if (option == target) {
         return true;
@@ -28,7 +31,12 @@ public interface WriteOption {
     return false;
   }
 
-  default boolean isOneOf(WriteOption... options) {
+  static boolean containsOption(WriteOption[] options,
+                                WriteOption target) {
+    return containsOption(Arrays.asList(options), target);
+  }
+
+  default boolean isOneOf(Iterable<WriteOption> options) {
     return containsOption(options, this);
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index cf0920876..cde07c415 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -19,7 +19,8 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.io.WriteOption;
+import java.util.List;
 
 public interface DataStreamRequest extends DataStreamPacket {
-  WriteOption[] getWriteOptions();
+  List<WriteOption> getWriteOptions();
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index f2fbc159b..a0c68eff1 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,28 +18,34 @@
 
 package org.apache.ratis.protocol;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 /**
  * The header format is the same {@link DataStreamPacketHeader}
  * since there are no additional fields.
  */
 public class DataStreamRequestHeader extends DataStreamPacketHeader implements 
DataStreamRequest {
 
-  private final WriteOption[] options;
+  private final List<WriteOption> options;
 
-  @SuppressFBWarnings("EI_EXPOSE_REP2")
   public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, 
long streamOffset, long dataLength,
       WriteOption... options) {
+    this(clientId, type, streamId, streamOffset, dataLength, 
Arrays.asList(options));
+  }
+
+  public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, 
long streamOffset, long dataLength,
+                                 Iterable<WriteOption> options) {
     super(clientId, type, streamId, streamOffset, dataLength);
-    this.options = options;
+    this.options = Collections.unmodifiableList(Lists.newArrayList(options));
   }
 
   @Override
-  @SuppressFBWarnings("EI_EXPOSE_REP")
-  public WriteOption[] getWriteOptions() {
+  public List<WriteOption> getWriteOptions() {
     return options;
   }
 }
\ No newline at end of file
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 96266789b..2f9aa695e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -90,7 +90,8 @@ public class DataStreamManagement {
       this.metrics = metrics;
     }
 
-    CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor 
executor) {
+    CompletableFuture<Long> write(ByteBuf buf, Iterable<WriteOption> options,
+                                  Executor executor) {
       final Timekeeper.Context context = metrics.start();
       return composeAsync(writeFuture, executor,
           n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, 
stream, executor)
@@ -112,7 +113,8 @@ public class DataStreamManagement {
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, 
Executor executor) {
       final Timekeeper.Context context = metrics.start();
       return composeAsync(sendFuture, executor,
-          n -> out.writeAsync(request.slice().nioBuffer(), 
request.getWriteOptions())
+          n -> out.writeAsync(request.slice().nioBuffer(),
+                          request.getWriteOptions())
               .whenComplete((l, e) -> metrics.stop(context, e == null)));
     }
   }
@@ -282,13 +284,16 @@ public class DataStreamManagement {
     return future.updateAndGet(previous -> previous.thenComposeAsync(function, 
executor));
   }
 
-  static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[] 
options, DataStream stream,
+  static CompletableFuture<Long> writeToAsync(ByteBuf buf,
+                                              Iterable<WriteOption> options,
+                                              DataStream stream,
       Executor defaultExecutor) {
     final Executor e = 
Optional.ofNullable(stream.getExecutor()).orElse(defaultExecutor);
     return CompletableFuture.supplyAsync(() -> writeTo(buf, options, stream), 
e);
   }
 
-  static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) {
+  static long writeTo(ByteBuf buf, Iterable<WriteOption> options,
+                      DataStream stream) {
     final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
@@ -389,7 +394,8 @@ public class DataStreamManagement {
 
   private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx, ByteBuf buf,
       CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
-    boolean close = WriteOption.containsOption(request.getWriteOptions(), 
StandardWriteOption.CLOSE);
+    boolean close = WriteOption.containsOption(request.getWriteOptions(),
+            StandardWriteOption.CLOSE);
     ClientInvocationId key =  
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
@@ -418,7 +424,9 @@ public class DataStreamManagement {
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
-      localWrite = info.getLocal().write(buf, request.getWriteOptions(), 
writeExecutor);
+      localWrite = info.getLocal().write(buf,
+              request.getWriteOptions(),
+              writeExecutor);
       remoteWrites = info.applyToRemotes(out -> out.write(request, 
requestExecutor));
     } else {
       throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 29bda7094..84803eb0e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -18,16 +18,19 @@
 
 package org.apache.ratis.netty.server;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Implements {@link DataStreamRequest} with {@link ByteBuf}.
  *
@@ -35,19 +38,18 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
  */
 public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements 
DataStreamRequest {
   private final ByteBuf buf;
-  private final WriteOption[] options;
+  private final List<WriteOption> options;
 
-  @SuppressFBWarnings("EI_EXPOSE_REP2")
-  public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, 
long streamOffset, WriteOption[] options,
-      ByteBuf buf) {
+  public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, 
long streamOffset,
+                                  List<WriteOption> options, ByteBuf buf) {
     super(clientId, type, streamId, streamOffset);
     this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
-    this.options = options;
+    this.options = Collections.unmodifiableList(Lists.newArrayList(options));
   }
 
   public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) 
{
     this(header.getClientId(), header.getType(), header.getStreamId(), 
header.getStreamOffset(),
-        header.getWriteOptions(), buf);
+         header.getWriteOptions(), buf);
   }
 
   @Override
@@ -60,8 +62,7 @@ public class DataStreamRequestByteBuf extends 
DataStreamPacketImpl implements Da
   }
 
   @Override
-  @SuppressFBWarnings("EI_EXPOSE_REP")
-  public WriteOption[] getWriteOptions() {
+  public List<WriteOption> getWriteOptions() {
     return options;
   }
 }

Reply via email to