This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 202b0c731b HDDS-11860. Improve BufferUtils.writeFully. (#7564)
202b0c731b is described below
commit 202b0c731bcb2a25a2fcb0d4826b326bb0b79865
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Dec 13 23:44:58 2024 -0800
HDDS-11860. Improve BufferUtils.writeFully. (#7564)
---
.../apache/hadoop/hdds/utils/db/CodecBuffer.java | 6 +-
.../common/ChunkBufferImplWithByteBufferList.java | 5 +-
.../ozone/common/IncrementalChunkBuffer.java | 6 +-
.../hadoop/ozone/common/utils/BufferUtils.java | 36 +++++++++-
.../hadoop/hdds/utils/MockGatheringChannel.java | 77 +++++++++++++++++-----
5 files changed, 99 insertions(+), 31 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
index 1ac293b301..87be912bb5 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -58,9 +58,9 @@ public class CodecBuffer implements UncheckedAutoCloseable {
private static class Factory {
private static volatile BiFunction<ByteBuf, Object, CodecBuffer>
constructor
= CodecBuffer::new;
- static void set(BiFunction<ByteBuf, Object, CodecBuffer> f) {
+ static void set(BiFunction<ByteBuf, Object, CodecBuffer> f, String name) {
constructor = f;
- LOG.info("Successfully set constructor to " + f);
+ LOG.info("Successfully set constructor to {}: {}", name, f);
}
static CodecBuffer newCodecBuffer(ByteBuf buf) {
@@ -89,7 +89,7 @@ public class CodecBuffer implements UncheckedAutoCloseable {
* Note that there is a severe performance penalty for leak detection.
*/
public static void enableLeakDetection() {
- Factory.set(LeakDetector::newCodecBuffer);
+ Factory.set(LeakDetector::newCodecBuffer, "LeakDetector::newCodecBuffer");
}
/** The size of a buffer. */
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index f9992c9442..e1f169662f 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -248,10 +248,7 @@ public class ChunkBufferImplWithByteBufferList implements
ChunkBuffer {
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
- long written = 0;
- for (ByteBuffer buf : buffers) {
- written += BufferUtils.writeFully(channel, buf);
- }
+ final long written = BufferUtils.writeFully(channel, buffers);
findCurrent();
return written;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index 249c67e4dd..732af4b685 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -280,11 +280,7 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
- long written = 0;
- for (ByteBuffer buf : buffers) {
- written += BufferUtils.writeFully(channel, buf);
- }
- return written;
+ return BufferUtils.writeFully(channel, buffers);
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
index 01b2ec0af1..a266c3615b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -26,11 +26,16 @@ import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utilities for buffers.
*/
public final class BufferUtils {
+ public static final Logger LOG = LoggerFactory.getLogger(BufferUtils.class);
+
+ private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {};
/** Utility classes should not be constructed. **/
private BufferUtils() {
@@ -147,11 +152,38 @@ public final class BufferUtils {
long written = 0;
while (bb.remaining() > 0) {
int n = ch.write(bb);
- if (n <= 0) {
- throw new IllegalStateException("no bytes written");
+ if (n < 0) {
+ throw new IllegalStateException("GatheringByteChannel.write returns "
+ n + " < 0 for " + ch);
}
written += n;
}
return written;
}
+
+ public static long writeFully(GatheringByteChannel ch, List<ByteBuffer>
buffers) throws IOException {
+ return BufferUtils.writeFully(ch,
buffers.toArray(EMPTY_BYTE_BUFFER_ARRAY));
+ }
+
+ public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers)
throws IOException {
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < buffers.length; i++) {
+ LOG.debug("buffer[{}]: remaining={}", i, buffers[i].remaining());
+ }
+ }
+
+ long written = 0;
+ for (int i = 0; i < buffers.length; i++) {
+ while (buffers[i].remaining() > 0) {
+ final long n = ch.write(buffers, i, buffers.length - i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("buffer[{}]: remaining={}, written={}", i,
buffers[i].remaining(), n);
+ }
+ if (n < 0) {
+ throw new IllegalStateException("GatheringByteChannel.write returns
" + n + " < 0 for " + ch);
+ }
+ written += n;
+ }
+ }
+ return written;
+ }
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
index 8f9256cd77..83b6851238 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java
@@ -21,8 +21,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* {@link GatheringByteChannel} implementation for testing. Delegates
@@ -45,11 +48,32 @@ public class MockGatheringChannel implements
GatheringByteChannel {
checkElementIndex(offset, srcs.length, "offset");
checkElementIndex(offset + length - 1, srcs.length, "offset+length");
- long bytes = 0;
- for (ByteBuffer b : srcs) {
- bytes += write(b);
+ long fullLength = 0;
+ for (int i = offset; i < srcs.length; i++) {
+ fullLength += srcs[i].remaining();
}
- return bytes;
+ if (fullLength <= 0) {
+ return 0;
+ }
+
+ // simulate partial write by setting a random partial length
+ final long partialLength = ThreadLocalRandom.current().nextLong(fullLength
+ 1);
+
+ long written = 0;
+ for (int i = offset; i < srcs.length; i++) {
+ for (final ByteBuffer src = srcs[i]; src.hasRemaining();) {
+ final long n = partialLength - written; // write at most n bytes
+ assertThat(n).isGreaterThanOrEqualTo(0);
+ if (n == 0) {
+ return written;
+ }
+
+ final int remaining = src.remaining();
+ final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining
- n);
+ written += adjustedWrite(src, adjustment);
+ }
+ }
+ return written;
}
@Override
@@ -59,21 +83,40 @@ public class MockGatheringChannel implements
GatheringByteChannel {
@Override
public int write(ByteBuffer src) throws IOException {
- // If src has more than 1 byte left, simulate partial write by adjusting
limit.
- // Remaining 1 byte should be written on next call.
- // This helps verify that the caller ensures buffer is written fully.
- final int adjustment = 1;
- final boolean limitWrite = src.remaining() > adjustment;
- if (limitWrite) {
- src.limit(src.limit() - adjustment);
+ final int remaining = src.remaining();
+ if (remaining <= 0) {
+ return 0;
}
- try {
- return delegate.write(src);
- } finally {
- if (limitWrite) {
- src.limit(src.limit() + adjustment);
- }
+ // Simulate partial write by a random adjustment.
+ final int adjustment = ThreadLocalRandom.current().nextInt(remaining + 1);
+ return adjustedWrite(src, adjustment);
+ }
+
+ /** Simulate partial write by the given adjustment. */
+ private int adjustedWrite(ByteBuffer src, int adjustment) throws IOException
{
+ assertThat(adjustment).isGreaterThanOrEqualTo(0);
+ final int remaining = src.remaining();
+ if (remaining <= 0) {
+ return 0;
}
+ assertThat(adjustment).isLessThanOrEqualTo(remaining);
+
+ final int oldLimit = src.limit();
+ final int newLimit = oldLimit - adjustment;
+ src.limit(newLimit);
+ assertEquals(newLimit, src.limit());
+ final int toWrite = remaining - adjustment;
+ assertEquals(toWrite, src.remaining());
+
+ final int written = delegate.write(src);
+ assertEquals(newLimit, src.limit());
+ assertEquals(toWrite - written, src.remaining());
+
+ src.limit(oldLimit);
+ assertEquals(oldLimit, src.limit());
+ assertEquals(remaining - written, src.remaining());
+
+ return written;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]