This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e17f92c4fc HDDS-10821. Ensure ChunkBuffer fully writes buffer to 
FileChannel (#6652)
e17f92c4fc is described below

commit e17f92c4fce90c30ab6f3eb17b4d4e38e363c2e8
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Dec 4 11:06:36 2024 -0800

    HDDS-10821. Ensure ChunkBuffer fully writes buffer to FileChannel (#6652)
---
 .../ozone/common/ChunkBufferImplWithByteBuffer.java    |  3 ++-
 .../common/ChunkBufferImplWithByteBufferList.java      |  9 +++++++--
 .../hadoop/ozone/common/IncrementalChunkBuffer.java    |  7 ++++++-
 .../apache/hadoop/ozone/common/utils/BufferUtils.java  | 18 ++++++++++++++++++
 .../apache/hadoop/hdds/utils/MockGatheringChannel.java | 16 +++++++++++++++-
 5 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 782476eb56..254be93dc4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -27,6 +27,7 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Function;
 
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.UncheckedAutoCloseable;
 
@@ -102,7 +103,7 @@ final class ChunkBufferImplWithByteBuffer implements 
ChunkBuffer {
 
   @Override
   public long writeTo(GatheringByteChannel channel) throws IOException {
-    return channel.write(buffer);
+    return BufferUtils.writeFully(channel, buffer);
   }
 
   @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index a3b5f9d2ee..f9992c9442 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -246,9 +248,12 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
 
   @Override
   public long writeTo(GatheringByteChannel channel) throws IOException {
-    long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
+    long written = 0;
+    for (ByteBuffer buf : buffers) {
+      written += BufferUtils.writeFully(channel, buf);
+    }
     findCurrent();
-    return bytes;
+    return written;
   }
 
   @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index dda4fae0d2..249c67e4dd 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.common;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -279,7 +280,11 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
 
   @Override
   public long writeTo(GatheringByteChannel channel) throws IOException {
-    return channel.write(buffers.toArray(new ByteBuffer[0]));
+    long written = 0;
+    for (ByteBuffer buf : buffers) {
+      written += BufferUtils.writeFully(channel, buf);
+    }
+    return written;
   }
 
   @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
index c6ad754f19..01b2ec0af1 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -19,7 +19,10 @@
 package org.apache.hadoop.ozone.common.utils;
 
 import com.google.common.base.Preconditions;
+
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -136,4 +139,19 @@ public final class BufferUtils {
     }
     return Math.toIntExact(n);
   }
+
+  /**
+   * Write all remaining bytes in buffer to the given channel.
+   */
+  public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws 
IOException {
+    long written = 0;
+    while (bb.remaining() > 0) {
+      int n = ch.write(bb);
+      if (n <= 0) {
+        throw new IllegalStateException("no bytes written");
+      }
+      written += n;
+    }
+    return written;
+  }
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
index ce6f58dadc..8f9256cd77 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
@@ -59,7 +59,21 @@ public class MockGatheringChannel implements 
GatheringByteChannel {
 
   @Override
   public int write(ByteBuffer src) throws IOException {
-    return delegate.write(src);
+    // If src has more than 1 byte left, simulate partial write by adjusting 
limit.
+    // Remaining 1 byte should be written on next call.
+    // This helps verify that the caller ensures buffer is written fully.
+    final int adjustment = 1;
+    final boolean limitWrite = src.remaining() > adjustment;
+    if (limitWrite) {
+      src.limit(src.limit() - adjustment);
+    }
+    try {
+      return delegate.write(src);
+    } finally {
+      if (limitWrite) {
+        src.limit(src.limit() + adjustment);
+      }
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to