This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b07bb21333 HDDS-11223. Fix iteration over 
ChunkBufferImplWithByteBufferList (#6999)
b07bb21333 is described below

commit b07bb21333781ae8160528c4aa14cd99fff448f2
Author: Cyrill <[email protected]>
AuthorDate: Mon Jul 29 02:43:26 2024 +0300

    HDDS-11223. Fix iteration over ChunkBufferImplWithByteBufferList (#6999)
---
 .../common/ChunkBufferImplWithByteBufferList.java  | 41 +++++++++++--
 .../TestChunkBufferImplWithByteBufferList.java     | 70 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index 7c3a0c7d2d..a3b5f9d2ee 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -61,6 +61,7 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
 
   private void findCurrent() {
     boolean found = false;
+    limitPrecedingCurrent = 0;
     for (int i = 0; i < buffers.size(); i++) {
       final ByteBuffer buf = buffers.get(i);
       final int pos = buf.position();
@@ -185,6 +186,8 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
    */
   @Override
   public Iterable<ByteBuffer> iterate(int bufferSize) {
+    Preconditions.checkArgument(bufferSize > 0);
+
     return () -> new Iterator<ByteBuffer>() {
       @Override
       public boolean hasNext() {
@@ -198,10 +201,40 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
         }
         findCurrent();
         ByteBuffer current = buffers.get(currentIndex);
-        final ByteBuffer duplicated = current.duplicate();
-        duplicated.limit(current.limit());
-        current.position(current.limit());
-        return duplicated;
+
+        // If current buffer has enough space or it's the last one, return it.
+        if (current.remaining() >= bufferSize || currentIndex == 
buffers.size() - 1) {
+          final ByteBuffer duplicated = current.duplicate();
+          int duplicatedLimit = Math.min(current.position() + bufferSize, 
current.limit());
+          duplicated.limit(duplicatedLimit);
+          duplicated.position(current.position());
+
+          current.position(duplicatedLimit);
+          return duplicated;
+        }
+
+        // Otherwise, create a new buffer.
+        int newBufferSize = Math.min(bufferSize, remaining());
+        ByteBuffer allocated = ByteBuffer.allocate(newBufferSize);
+        int remainingToFill = allocated.remaining();
+
+        while (remainingToFill > 0) {
+          final ByteBuffer b = current();
+          int bytes = Math.min(b.remaining(), remainingToFill);
+          b.limit(b.position() + bytes);
+          allocated.put(b);
+          remainingToFill -= bytes;
+          advanceCurrent();
+        }
+
+        allocated.flip();
+
+        // Up-to-date current.
+        current = buffers.get(currentIndex);
+        // Reset
+        current.limit(current.capacity());
+
+        return allocated;
       }
     };
   }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
index 072c07be64..b06b4b5633 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
@@ -67,6 +67,76 @@ public class TestChunkBufferImplWithByteBufferList {
     assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
   }
 
+  @Test
+  public void testIterateSmallerOverSingleChunk() {
+    ChunkBuffer subject = 
ChunkBuffer.wrap(ImmutableList.of(ByteBuffer.allocate(100)));
+
+    assertEquals(0, subject.position());
+    assertEquals(100, subject.remaining());
+    assertEquals(100, subject.limit());
+
+    subject.iterate(25).forEach(buffer -> assertEquals(25, 
buffer.remaining()));
+
+    assertEquals(100, subject.position());
+    assertEquals(0, subject.remaining());
+    assertEquals(100, subject.limit());
+  }
+
+  @Test
+  public void testIterateOverMultipleChunksFitChunkSize() {
+    ByteBuffer b1 = ByteBuffer.allocate(100);
+    ByteBuffer b2 = ByteBuffer.allocate(100);
+    ByteBuffer b3 = ByteBuffer.allocate(100);
+    ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3));
+
+    assertEquals(0, subject.position());
+    assertEquals(300, subject.remaining());
+    assertEquals(300, subject.limit());
+
+    subject.iterate(100).forEach(buffer -> assertEquals(100, 
buffer.remaining()));
+
+    assertEquals(300, subject.position());
+    assertEquals(0, subject.remaining());
+    assertEquals(300, subject.limit());
+  }
+
+  @Test
+  public void testIterateOverMultipleChunksSmallerChunks() {
+    ByteBuffer b1 = ByteBuffer.allocate(100);
+    ByteBuffer b2 = ByteBuffer.allocate(100);
+    ByteBuffer b3 = ByteBuffer.allocate(100);
+    ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3));
+
+    assertEquals(0, subject.position());
+    assertEquals(300, subject.remaining());
+    assertEquals(300, subject.limit());
+
+    subject.iterate(50).forEach(buffer -> assertEquals(50, 
buffer.remaining()));
+
+    assertEquals(300, subject.position());
+    assertEquals(0, subject.remaining());
+    assertEquals(300, subject.limit());
+  }
+
+  @Test
+  public void testIterateOverMultipleChunksBiggerChunks() {
+    ByteBuffer b1 = ByteBuffer.allocate(100);
+    ByteBuffer b2 = ByteBuffer.allocate(100);
+    ByteBuffer b3 = ByteBuffer.allocate(100);
+    ByteBuffer b4 = ByteBuffer.allocate(100);
+    ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3, b4));
+
+    assertEquals(0, subject.position());
+    assertEquals(400, subject.remaining());
+    assertEquals(400, subject.limit());
+
+    subject.iterate(200).forEach(buffer -> assertEquals(200, 
buffer.remaining()));
+
+    assertEquals(400, subject.position());
+    assertEquals(0, subject.remaining());
+    assertEquals(400, subject.limit());
+  }
+
   private static void assertEmpty(ChunkBuffer subject) {
     assertEquals(0, subject.position());
     assertEquals(0, subject.remaining());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to