This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b07bb21333 HDDS-11223. Fix iteration over
ChunkBufferImplWithByteBufferList (#6999)
b07bb21333 is described below
commit b07bb21333781ae8160528c4aa14cd99fff448f2
Author: Cyrill <[email protected]>
AuthorDate: Mon Jul 29 02:43:26 2024 +0300
HDDS-11223. Fix iteration over ChunkBufferImplWithByteBufferList (#6999)
---
.../common/ChunkBufferImplWithByteBufferList.java | 41 +++++++++++--
.../TestChunkBufferImplWithByteBufferList.java | 70 ++++++++++++++++++++++
2 files changed, 107 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index 7c3a0c7d2d..a3b5f9d2ee 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -61,6 +61,7 @@ public class ChunkBufferImplWithByteBufferList implements
ChunkBuffer {
private void findCurrent() {
boolean found = false;
+ limitPrecedingCurrent = 0;
for (int i = 0; i < buffers.size(); i++) {
final ByteBuffer buf = buffers.get(i);
final int pos = buf.position();
@@ -185,6 +186,8 @@ public class ChunkBufferImplWithByteBufferList implements
ChunkBuffer {
*/
@Override
public Iterable<ByteBuffer> iterate(int bufferSize) {
+ Preconditions.checkArgument(bufferSize > 0);
+
return () -> new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
@@ -198,10 +201,40 @@ public class ChunkBufferImplWithByteBufferList implements
ChunkBuffer {
}
findCurrent();
ByteBuffer current = buffers.get(currentIndex);
- final ByteBuffer duplicated = current.duplicate();
- duplicated.limit(current.limit());
- current.position(current.limit());
- return duplicated;
+
+ // If current buffer has enough space or it's the last one, return it.
+ if (current.remaining() >= bufferSize || currentIndex ==
buffers.size() - 1) {
+ final ByteBuffer duplicated = current.duplicate();
+ int duplicatedLimit = Math.min(current.position() + bufferSize,
current.limit());
+ duplicated.limit(duplicatedLimit);
+ duplicated.position(current.position());
+
+ current.position(duplicatedLimit);
+ return duplicated;
+ }
+
+ // Otherwise, create a new buffer.
+ int newBufferSize = Math.min(bufferSize, remaining());
+ ByteBuffer allocated = ByteBuffer.allocate(newBufferSize);
+ int remainingToFill = allocated.remaining();
+
+ while (remainingToFill > 0) {
+ final ByteBuffer b = current();
+ int bytes = Math.min(b.remaining(), remainingToFill);
+ b.limit(b.position() + bytes);
+ allocated.put(b);
+ remainingToFill -= bytes;
+ advanceCurrent();
+ }
+
+ allocated.flip();
+
+ // Up-to-date current.
+ current = buffers.get(currentIndex);
+ // Reset
+ current.limit(current.capacity());
+
+ return allocated;
}
};
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
index 072c07be64..b06b4b5633 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
@@ -67,6 +67,76 @@ public class TestChunkBufferImplWithByteBufferList {
assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
}
+ @Test
+ public void testIterateSmallerOverSingleChunk() {
+ ChunkBuffer subject =
ChunkBuffer.wrap(ImmutableList.of(ByteBuffer.allocate(100)));
+
+ assertEquals(0, subject.position());
+ assertEquals(100, subject.remaining());
+ assertEquals(100, subject.limit());
+
+ subject.iterate(25).forEach(buffer -> assertEquals(25,
buffer.remaining()));
+
+ assertEquals(100, subject.position());
+ assertEquals(0, subject.remaining());
+ assertEquals(100, subject.limit());
+ }
+
+ @Test
+ public void testIterateOverMultipleChunksFitChunkSize() {
+ ByteBuffer b1 = ByteBuffer.allocate(100);
+ ByteBuffer b2 = ByteBuffer.allocate(100);
+ ByteBuffer b3 = ByteBuffer.allocate(100);
+ ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3));
+
+ assertEquals(0, subject.position());
+ assertEquals(300, subject.remaining());
+ assertEquals(300, subject.limit());
+
+ subject.iterate(100).forEach(buffer -> assertEquals(100,
buffer.remaining()));
+
+ assertEquals(300, subject.position());
+ assertEquals(0, subject.remaining());
+ assertEquals(300, subject.limit());
+ }
+
+ @Test
+ public void testIterateOverMultipleChunksSmallerChunks() {
+ ByteBuffer b1 = ByteBuffer.allocate(100);
+ ByteBuffer b2 = ByteBuffer.allocate(100);
+ ByteBuffer b3 = ByteBuffer.allocate(100);
+ ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3));
+
+ assertEquals(0, subject.position());
+ assertEquals(300, subject.remaining());
+ assertEquals(300, subject.limit());
+
+ subject.iterate(50).forEach(buffer -> assertEquals(50,
buffer.remaining()));
+
+ assertEquals(300, subject.position());
+ assertEquals(0, subject.remaining());
+ assertEquals(300, subject.limit());
+ }
+
+ @Test
+ public void testIterateOverMultipleChunksBiggerChunks() {
+ ByteBuffer b1 = ByteBuffer.allocate(100);
+ ByteBuffer b2 = ByteBuffer.allocate(100);
+ ByteBuffer b3 = ByteBuffer.allocate(100);
+ ByteBuffer b4 = ByteBuffer.allocate(100);
+ ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3, b4));
+
+ assertEquals(0, subject.position());
+ assertEquals(400, subject.remaining());
+ assertEquals(400, subject.limit());
+
+ subject.iterate(200).forEach(buffer -> assertEquals(200,
buffer.remaining()));
+
+ assertEquals(400, subject.position());
+ assertEquals(0, subject.remaining());
+ assertEquals(400, subject.limit());
+ }
+
private static void assertEmpty(ChunkBuffer subject) {
assertEquals(0, subject.position());
assertEquals(0, subject.remaining());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]