This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e17f92c4fc HDDS-10821. Ensure ChunkBuffer fully writes buffer to
FileChannel (#6652)
e17f92c4fc is described below
commit e17f92c4fce90c30ab6f3eb17b4d4e38e363c2e8
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Dec 4 11:06:36 2024 -0800
HDDS-10821. Ensure ChunkBuffer fully writes buffer to FileChannel (#6652)
---
.../ozone/common/ChunkBufferImplWithByteBuffer.java | 3 ++-
.../common/ChunkBufferImplWithByteBufferList.java | 9 +++++++--
.../hadoop/ozone/common/IncrementalChunkBuffer.java | 7 ++++++-
.../apache/hadoop/ozone/common/utils/BufferUtils.java | 18 ++++++++++++++++++
.../apache/hadoop/hdds/utils/MockGatheringChannel.java | 16 +++++++++++++++-
5 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 782476eb56..254be93dc4 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -27,6 +27,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -102,7 +103,7 @@ final class ChunkBufferImplWithByteBuffer implements
ChunkBuffer {
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
- return channel.write(buffer);
+ return BufferUtils.writeFully(channel, buffer);
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index a3b5f9d2ee..f9992c9442 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
+
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
@@ -246,9 +248,12 @@ public class ChunkBufferImplWithByteBufferList implements
ChunkBuffer {
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
- long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
+ long written = 0;
+ for (ByteBuffer buf : buffers) {
+ written += BufferUtils.writeFully(channel, buf);
+ }
findCurrent();
- return bytes;
+ return written;
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index dda4fae0d2..249c67e4dd 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.common;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
@@ -279,7 +280,11 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
- return channel.write(buffers.toArray(new ByteBuffer[0]));
+ long written = 0;
+ for (ByteBuffer buf : buffers) {
+ written += BufferUtils.writeFully(channel, buf);
+ }
+ return written;
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
index c6ad754f19..01b2ec0af1 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -19,7 +19,10 @@
package org.apache.hadoop.ozone.common.utils;
import com.google.common.base.Preconditions;
+
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -136,4 +139,19 @@ public final class BufferUtils {
}
return Math.toIntExact(n);
}
+
+ /**
+ * Write all remaining bytes in buffer to the given channel.
+ */
+ public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws
IOException {
+ long written = 0;
+ while (bb.remaining() > 0) {
+ int n = ch.write(bb);
+ if (n <= 0) {
+ throw new IllegalStateException("no bytes written");
+ }
+ written += n;
+ }
+ return written;
+ }
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
index ce6f58dadc..8f9256cd77 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
@@ -59,7 +59,21 @@ public class MockGatheringChannel implements
GatheringByteChannel {
@Override
public int write(ByteBuffer src) throws IOException {
- return delegate.write(src);
+ // If src has more than 1 byte left, simulate partial write by adjusting
limit.
+ // Remaining 1 byte should be written on next call.
+ // This helps verify that the caller ensures buffer is written fully.
+ final int adjustment = 1;
+ final boolean limitWrite = src.remaining() > adjustment;
+ if (limitWrite) {
+ src.limit(src.limit() - adjustment);
+ }
+ try {
+ return delegate.write(src);
+ } finally {
+ if (limitWrite) {
+ src.limit(src.limit() + adjustment);
+ }
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]