This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6063d0e9 CASSSIDECAR-405: Fix ByteBuffer flip() in 
StreamBuffer.copyBytes() causing data corruption (#314)
6063d0e9 is described below

commit 6063d0e90e3add43a7ea7d9cc8f89092adf7e831
Author: Jyothsna konisa <[email protected]>
AuthorDate: Tue Feb 10 14:57:31 2026 -0800

    CASSSIDECAR-405: Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing 
data corruption (#314)
    
    Patch by Jyothsna Konisa; Reviewed by Yifan Cai and Bernardo Botella for 
CASSSIDECAR-405
---
 .../cassandra/sidecar/client/StreamBuffer.java     |  9 ++--
 .../cassandra/sidecar/client/StreamBufferTest.java | 54 ++++++++++++++++++++--
 .../sidecar/client/VertxStreamBuffer.java          |  1 -
 .../sidecar/client/VertxStreamBufferTest.java      | 54 ++++++++++++++++++++--
 4 files changed, 106 insertions(+), 12 deletions(-)

diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
index d5e7e9c9..e1786baf 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
@@ -27,11 +27,15 @@ import java.nio.ByteBuffer;
 public interface StreamBuffer
 {
     /**
-     * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer 
destination}
+     * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer 
destination}.
+     * <p>
+     * This method writes {@code length} bytes starting at the destination 
buffer's current position
+     * and advances the position by {@code length}. The caller is responsible 
for calling
+     * {@link ByteBuffer#flip()} on the destination buffer before reading from 
it.
      *
      * @param sourceOffset the offset within the {@link StreamBuffer} to be 
read; must be non-negative and
      *                     larger than the buffer length
-     * @param destination  a {@link ByteBuffer} where the data will be copied
+     * @param destination  a {@link ByteBuffer} where the data will be copied 
at its current position
      * @param length       the number of bytes to be copied from the {@link 
StreamBuffer}; must be non-negative and
      *                     larger than the {@code buffer.length - sourceOffset}
      */
@@ -94,7 +98,6 @@ public interface StreamBuffer
         public void copyBytes(int sourceOffset, ByteBuffer destination, int 
length)
         {
             destination.put(bytes, sourceOffset, length);
-            destination.flip();
         }
 
         @Override
diff --git 
a/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
 
b/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
index 253fd2d2..ec1dd79f 100644
--- 
a/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
+++ 
b/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
@@ -67,8 +67,9 @@ class StreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 11);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(11);
+        assertThat(destination.position()).isEqualTo(11);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello 
World");
@@ -81,8 +82,9 @@ class StreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 5);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(5);
+        assertThat(destination.position()).isEqualTo(5);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -98,4 +100,48 @@ class StreamBufferTest
             assertThat(streamBuffer.getByte(i)).isEqualTo((byte) 
inputString.charAt(i));
         }
     }
+
+    @Test
+    void testMultipleChunkCopyToSameBuffer()
+    {
+        // Simulates BufferingInputStream filling a buffer with multiple chunks
+        // This validates the fix for the flip() bug where chunks were 
overwriting instead of appending
+
+        // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC", 
"DDDD"
+        String sourceData = "AAAABBBBCCCCDDDD";
+        StreamBuffer streamBuffer = 
StreamBuffer.wrap(sourceData.getBytes(StandardCharsets.UTF_8));
+
+        // Destination buffer to accumulate all chunks (16 bytes total)
+        ByteBuffer destination = ByteBuffer.allocate(16);
+
+        // Write chunks one at a time (simulating multi-chunk read)
+        streamBuffer.copyBytes(0, destination, 4);   // Write "AAAA" at 
position 0
+        assertThat(destination.position()).isEqualTo(4);  // Position should 
advance to 4
+
+        streamBuffer.copyBytes(4, destination, 4);   // Write "BBBB" at 
position 4
+        assertThat(destination.position()).isEqualTo(8);  // Position should 
advance to 8
+
+        streamBuffer.copyBytes(8, destination, 4);   // Write "CCCC" at 
position 8
+        assertThat(destination.position()).isEqualTo(12); // Position should 
advance to 12
+
+        streamBuffer.copyBytes(12, destination, 4);  // Write "DDDD" at 
position 12
+        assertThat(destination.position()).isEqualTo(16); // Position should 
advance to 16
+
+        // Buffer should be completely filled
+        assertThat(destination.remaining()).isEqualTo(0);
+
+        // Flip to read mode and verify all chunks are present (not 
overwritten)
+        destination.flip();
+        byte[] result = new byte[16];
+        destination.get(result);
+
+        String resultString = new String(result, StandardCharsets.UTF_8);
+        assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+        // Verify each chunk individually
+        assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+        assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+        assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+        assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+    }
 }
diff --git 
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
 
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
index 8a194852..94ec87ad 100644
--- 
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
+++ 
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
@@ -46,7 +46,6 @@ public class VertxStreamBuffer implements StreamBuffer
     public void copyBytes(int sourceOffset, ByteBuffer destination, int length)
     {
         destination.put(buffer.getBytes(sourceOffset, sourceOffset + length));
-        destination.flip();
     }
 
     /**
diff --git 
a/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
 
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
index eb743c9f..d0e1b5e7 100644
--- 
a/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
+++ 
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
@@ -41,8 +41,9 @@ class VertxStreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 11);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(11);
+        assertThat(destination.position()).isEqualTo(11);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello 
World");
@@ -56,8 +57,9 @@ class VertxStreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 5);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(5);
+        assertThat(destination.position()).isEqualTo(5);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -93,4 +95,48 @@ class VertxStreamBufferTest
         assertThat(streamBuffer.readableBytes()).isEqualTo(8);
         streamBuffer.release(); // does nothing
     }
+
+    @Test
+    void testMultipleChunkCopyToSameBuffer()
+    {
+        // Simulates BufferingInputStream filling a buffer with multiple chunks
+        // This validates the fix for the flip() bug where chunks were 
overwriting instead of appending
+
+        // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC", 
"DDDD"
+        Buffer buffer = new BufferImpl().appendString("AAAABBBBCCCCDDDD");
+        StreamBuffer streamBuffer = new VertxStreamBuffer(buffer);
+
+        // Destination buffer to accumulate all chunks (16 bytes total)
+        ByteBuffer destination = ByteBuffer.allocate(16);
+
+        // Write chunks one at a time (simulating multi-chunk read)
+        streamBuffer.copyBytes(0, destination, 4);   // Write "AAAA" at 
position 0
+        assertThat(destination.position()).isEqualTo(4);  // Position should 
advance to 4
+
+        streamBuffer.copyBytes(4, destination, 4);   // Write "BBBB" at 
position 4
+        assertThat(destination.position()).isEqualTo(8);  // Position should 
advance to 8
+
+        streamBuffer.copyBytes(8, destination, 4);   // Write "CCCC" at 
position 8
+        assertThat(destination.position()).isEqualTo(12); // Position should 
advance to 12
+
+        streamBuffer.copyBytes(12, destination, 4);  // Write "DDDD" at 
position 12
+        assertThat(destination.position()).isEqualTo(16); // Position should 
advance to 16
+
+        // Buffer should be completely filled
+        assertThat(destination.remaining()).isEqualTo(0);
+
+        // Flip to read mode and verify all chunks are present (not 
overwritten)
+        destination.flip();
+        byte[] result = new byte[16];
+        destination.get(result);
+
+        String resultString = new String(result, StandardCharsets.UTF_8);
+        assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+        // Verify each chunk individually
+        assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+        assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+        assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+        assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to