This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 202b0c731b HDDS-11860. Improve BufferUtils.writeFully. (#7564)
202b0c731b is described below

commit 202b0c731bcb2a25a2fcb0d4826b326bb0b79865
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Dec 13 23:44:58 2024 -0800

    HDDS-11860. Improve BufferUtils.writeFully. (#7564)
---
 .../apache/hadoop/hdds/utils/db/CodecBuffer.java   |  6 +-
 .../common/ChunkBufferImplWithByteBufferList.java  |  5 +-
 .../ozone/common/IncrementalChunkBuffer.java       |  6 +-
 .../hadoop/ozone/common/utils/BufferUtils.java     | 36 +++++++++-
 .../hadoop/hdds/utils/MockGatheringChannel.java    | 77 +++++++++++++++++-----
 5 files changed, 99 insertions(+), 31 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
index 1ac293b301..87be912bb5 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -58,9 +58,9 @@ public class CodecBuffer implements UncheckedAutoCloseable {
   private static class Factory {
     private static volatile BiFunction<ByteBuf, Object, CodecBuffer> 
constructor
         = CodecBuffer::new;
-    static void set(BiFunction<ByteBuf, Object, CodecBuffer> f) {
+    static void set(BiFunction<ByteBuf, Object, CodecBuffer> f, String name) {
       constructor = f;
-      LOG.info("Successfully set constructor to " + f);
+      LOG.info("Successfully set constructor to {}: {}", name, f);
     }
 
     static CodecBuffer newCodecBuffer(ByteBuf buf) {
@@ -89,7 +89,7 @@ public class CodecBuffer implements UncheckedAutoCloseable {
    * Note that there is a severe performance penalty for leak detection.
    */
   public static void enableLeakDetection() {
-    Factory.set(LeakDetector::newCodecBuffer);
+    Factory.set(LeakDetector::newCodecBuffer, "LeakDetector::newCodecBuffer");
   }
 
   /** The size of a buffer. */
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index f9992c9442..e1f169662f 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -248,10 +248,7 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
 
   @Override
   public long writeTo(GatheringByteChannel channel) throws IOException {
-    long written = 0;
-    for (ByteBuffer buf : buffers) {
-      written += BufferUtils.writeFully(channel, buf);
-    }
+    final long written = BufferUtils.writeFully(channel, buffers);
     findCurrent();
     return written;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index 249c67e4dd..732af4b685 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -280,11 +280,7 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
 
   @Override
   public long writeTo(GatheringByteChannel channel) throws IOException {
-    long written = 0;
-    for (ByteBuffer buf : buffers) {
-      written += BufferUtils.writeFully(channel, buf);
-    }
-    return written;
+    return BufferUtils.writeFully(channel, buffers);
   }
 
   @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
index 01b2ec0af1..a266c3615b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -26,11 +26,16 @@ import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utilities for buffers.
  */
 public final class BufferUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(BufferUtils.class);
+
+  private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {};
 
   /** Utility classes should not be constructed. **/
   private BufferUtils() {
@@ -147,11 +152,38 @@ public final class BufferUtils {
     long written = 0;
     while (bb.remaining() > 0) {
       int n = ch.write(bb);
-      if (n <= 0) {
-        throw new IllegalStateException("no bytes written");
+      if (n < 0) {
+        throw new IllegalStateException("GatheringByteChannel.write returns " 
+ n + " < 0 for " + ch);
       }
       written += n;
     }
     return written;
   }
+
+  public static long writeFully(GatheringByteChannel ch, List<ByteBuffer> 
buffers) throws IOException {
+    return BufferUtils.writeFully(ch, 
buffers.toArray(EMPTY_BYTE_BUFFER_ARRAY));
+  }
+
+  public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) 
throws IOException {
+    if (LOG.isDebugEnabled()) {
+      for (int i = 0; i < buffers.length; i++) {
+        LOG.debug("buffer[{}]: remaining={}", i, buffers[i].remaining());
+      }
+    }
+
+    long written = 0;
+    for (int i = 0; i < buffers.length; i++) {
+      while (buffers[i].remaining() > 0) {
+        final long n = ch.write(buffers, i, buffers.length - i);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("buffer[{}]: remaining={}, written={}", i, 
buffers[i].remaining(), n);
+        }
+        if (n < 0) {
+          throw new IllegalStateException("GatheringByteChannel.write returns 
" + n + " < 0 for " + ch);
+        }
+        written += n;
+      }
+    }
+    return written;
+  }
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
index 8f9256cd77..83b6851238 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
@@ -21,8 +21,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * {@link GatheringByteChannel} implementation for testing.  Delegates
@@ -45,11 +48,32 @@ public class MockGatheringChannel implements 
GatheringByteChannel {
     checkElementIndex(offset, srcs.length, "offset");
     checkElementIndex(offset + length - 1, srcs.length, "offset+length");
 
-    long bytes = 0;
-    for (ByteBuffer b : srcs) {
-      bytes += write(b);
+    long fullLength = 0;
+    for (int i = offset; i < srcs.length; i++) {
+      fullLength += srcs[i].remaining();
     }
-    return bytes;
+    if (fullLength <= 0) {
+      return 0;
+    }
+
+    // simulate partial write by setting a random partial length
+    final long partialLength = ThreadLocalRandom.current().nextLong(fullLength 
+ 1);
+
+    long written = 0;
+    for (int i = offset; i < srcs.length; i++) {
+      for (final ByteBuffer src = srcs[i]; src.hasRemaining();) {
+        final long n = partialLength - written;  // write at most n bytes
+        assertThat(n).isGreaterThanOrEqualTo(0);
+        if (n == 0) {
+          return written;
+        }
+
+        final int remaining = src.remaining();
+        final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining 
- n);
+        written += adjustedWrite(src, adjustment);
+      }
+    }
+    return written;
   }
 
   @Override
@@ -59,21 +83,40 @@ public class MockGatheringChannel implements 
GatheringByteChannel {
 
   @Override
   public int write(ByteBuffer src) throws IOException {
-    // If src has more than 1 byte left, simulate partial write by adjusting 
limit.
-    // Remaining 1 byte should be written on next call.
-    // This helps verify that the caller ensures buffer is written fully.
-    final int adjustment = 1;
-    final boolean limitWrite = src.remaining() > adjustment;
-    if (limitWrite) {
-      src.limit(src.limit() - adjustment);
+    final int remaining = src.remaining();
+    if (remaining <= 0) {
+      return 0;
     }
-    try {
-      return delegate.write(src);
-    } finally {
-      if (limitWrite) {
-        src.limit(src.limit() + adjustment);
-      }
+    // Simulate partial write by a random adjustment.
+    final int adjustment = ThreadLocalRandom.current().nextInt(remaining + 1);
+    return adjustedWrite(src, adjustment);
+  }
+
+  /** Simulate partial write by the given adjustment. */
+  private int adjustedWrite(ByteBuffer src, int adjustment) throws IOException 
{
+    assertThat(adjustment).isGreaterThanOrEqualTo(0);
+    final int remaining = src.remaining();
+    if (remaining <= 0) {
+      return 0;
     }
+    assertThat(adjustment).isLessThanOrEqualTo(remaining);
+
+    final int oldLimit = src.limit();
+    final int newLimit = oldLimit - adjustment;
+    src.limit(newLimit);
+    assertEquals(newLimit, src.limit());
+    final int toWrite = remaining - adjustment;
+    assertEquals(toWrite, src.remaining());
+
+    final int written = delegate.write(src);
+    assertEquals(newLimit, src.limit());
+    assertEquals(toWrite - written, src.remaining());
+
+    src.limit(oldLimit);
+    assertEquals(oldLimit, src.limit());
+    assertEquals(remaining - written, src.remaining());
+
+    return written;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to