This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 018250af CASSANALYTICS-116 Fix ByteBuffer flip() in 
StreamBuffer.copyBytes() causing data corruption (#165)
018250af is described below

commit 018250aff4b74a8296a66285ad66aa591b52afea
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Feb 9 13:15:01 2026 -0800

    CASSANALYTICS-116 Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing 
data corruption (#165)
    
    Patch by Jyothsna Konisa; Reviewed by Yifan Cai and Bernardo Botella for 
CASSANALYTICS-116
---
 CHANGES.txt                                        |  1 +
 .../cassandra/sidecar/client/StreamBuffer.java     |  9 ++--
 .../cassandra/sidecar/client/StreamBufferTest.java | 54 ++++++++++++++++++++--
 .../sidecar/client/VertxStreamBuffer.java          |  1 -
 .../sidecar/client/VertxStreamBufferTest.java      | 54 ++++++++++++++++++++--
 .../cassandra/io/util/CdcRandomAccessReader.java   |  2 +
 .../io/util/CdcRandomAccessReaderTest.java         | 27 +++++------
 7 files changed, 121 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 94e62135..caf58b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption 
(CASSANALYTICS-116)
  * Fix race condition in DirectStreamSession#onSSTablesProduced and 
SortedSStableWriter#close (CASSANALYTICS-107)
  * Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
  * Add TimeRangeFilter to filter out SSTables outside given time window 
(CASSANALYTICS-102)
diff --git 
a/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
 
b/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
index d5e7e9c9..4664c040 100644
--- 
a/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
+++ 
b/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
@@ -27,11 +27,15 @@ import java.nio.ByteBuffer;
 public interface StreamBuffer
 {
     /**
-     * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer 
destination}
+      * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer 
destination}.
+     * <p>
+     * This method writes {@code length} bytes starting at the destination 
buffer's current position
+     * and advances the position by {@code length}. The caller is responsible 
for calling
+     * {@link ByteBuffer#flip()} on the destination buffer before reading from 
it.
      *
      * @param sourceOffset the offset within the {@link StreamBuffer} to be 
read; must be non-negative and
      *                     larger than the buffer length
-     * @param destination  a {@link ByteBuffer} where the data will be copied
+     * @param destination  a {@link ByteBuffer} where the data will be copied 
at its current position
      * @param length       the number of bytes to be copied from the {@link 
StreamBuffer}; must be non-negative and
      *                     larger than the {@code buffer.length - sourceOffset}
      */
@@ -94,7 +98,6 @@ public interface StreamBuffer
         public void copyBytes(int sourceOffset, ByteBuffer destination, int 
length)
         {
             destination.put(bytes, sourceOffset, length);
-            destination.flip();
         }
 
         @Override
diff --git 
a/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
 
b/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
index 253fd2d2..ec1dd79f 100644
--- 
a/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
+++ 
b/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
@@ -67,8 +67,9 @@ class StreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 11);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(11);
+        assertThat(destination.position()).isEqualTo(11);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello 
World");
@@ -81,8 +82,9 @@ class StreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 5);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(5);
+        assertThat(destination.position()).isEqualTo(5);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -98,4 +100,48 @@ class StreamBufferTest
             assertThat(streamBuffer.getByte(i)).isEqualTo((byte) 
inputString.charAt(i));
         }
     }
+
+    @Test
+    void testMultipleChunkCopyToSameBuffer()
+    {
+        // Simulates BufferingInputStream filling a buffer with multiple chunks
+        // This validates the fix for the flip() bug where chunks were 
overwriting instead of appending
+
+        // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC", 
"DDDD"
+        String sourceData = "AAAABBBBCCCCDDDD";
+        StreamBuffer streamBuffer = 
StreamBuffer.wrap(sourceData.getBytes(StandardCharsets.UTF_8));
+
+        // Destination buffer to accumulate all chunks (16 bytes total)
+        ByteBuffer destination = ByteBuffer.allocate(16);
+
+        // Write chunks one at a time (simulating multi-chunk read)
+        streamBuffer.copyBytes(0, destination, 4);   // Write "AAAA" at 
position 0
+        assertThat(destination.position()).isEqualTo(4);  // Position should 
advance to 4
+
+        streamBuffer.copyBytes(4, destination, 4);   // Write "BBBB" at 
position 4
+        assertThat(destination.position()).isEqualTo(8);  // Position should 
advance to 8
+
+        streamBuffer.copyBytes(8, destination, 4);   // Write "CCCC" at 
position 8
+        assertThat(destination.position()).isEqualTo(12); // Position should 
advance to 12
+
+        streamBuffer.copyBytes(12, destination, 4);  // Write "DDDD" at 
position 12
+        assertThat(destination.position()).isEqualTo(16); // Position should 
advance to 16
+
+        // Buffer should be completely filled
+        assertThat(destination.remaining()).isEqualTo(0);
+
+        // Flip to read mode and verify all chunks are present (not 
overwritten)
+        destination.flip();
+        byte[] result = new byte[16];
+        destination.get(result);
+
+        String resultString = new String(result, StandardCharsets.UTF_8);
+        assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+        // Verify each chunk individually
+        assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+        assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+        assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+        assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+    }
 }
diff --git 
a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
 
b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
index 8a194852..94ec87ad 100644
--- 
a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
+++ 
b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
@@ -46,7 +46,6 @@ public class VertxStreamBuffer implements StreamBuffer
     public void copyBytes(int sourceOffset, ByteBuffer destination, int length)
     {
         destination.put(buffer.getBytes(sourceOffset, sourceOffset + length));
-        destination.flip();
     }
 
     /**
diff --git 
a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
 
b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
index eb743c9f..d0e1b5e7 100644
--- 
a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
+++ 
b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
@@ -41,8 +41,9 @@ class VertxStreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 11);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(11);
+        assertThat(destination.position()).isEqualTo(11);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello 
World");
@@ -56,8 +57,9 @@ class VertxStreamBufferTest
         ByteBuffer destination = ByteBuffer.allocate(20);
         streamBuffer.copyBytes(0, destination, 5);
         assertThat(destination.hasArray()).isTrue();
-        assertThat(destination.position()).isEqualTo(0);
-        assertThat(destination.limit()).isEqualTo(5);
+        assertThat(destination.position()).isEqualTo(5);
+        assertThat(destination.limit()).isEqualTo(20);
+        destination.flip();
         byte[] dst = new byte[destination.limit()];
         destination.get(dst, 0, dst.length);
         assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -93,4 +95,48 @@ class VertxStreamBufferTest
         assertThat(streamBuffer.readableBytes()).isEqualTo(8);
         streamBuffer.release(); // does nothing
     }
+
+    @Test
+    void testMultipleChunkCopyToSameBuffer()
+    {
+        // Simulates BufferingInputStream filling a buffer with multiple chunks
+        // This validates the fix for the flip() bug where chunks were 
overwriting instead of appending
+
+        // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC", 
"DDDD"
+        Buffer buffer = new BufferImpl().appendString("AAAABBBBCCCCDDDD");
+        StreamBuffer streamBuffer = new VertxStreamBuffer(buffer);
+
+        // Destination buffer to accumulate all chunks (16 bytes total)
+        ByteBuffer destination = ByteBuffer.allocate(16);
+
+        // Write chunks one at a time (simulating multi-chunk read)
+        streamBuffer.copyBytes(0, destination, 4);   // Write "AAAA" at 
position 0
+        assertThat(destination.position()).isEqualTo(4);  // Position should 
advance to 4
+
+        streamBuffer.copyBytes(4, destination, 4);   // Write "BBBB" at 
position 4
+        assertThat(destination.position()).isEqualTo(8);  // Position should 
advance to 8
+
+        streamBuffer.copyBytes(8, destination, 4);   // Write "CCCC" at 
position 8
+        assertThat(destination.position()).isEqualTo(12); // Position should 
advance to 12
+
+        streamBuffer.copyBytes(12, destination, 4);  // Write "DDDD" at 
position 12
+        assertThat(destination.position()).isEqualTo(16); // Position should 
advance to 16
+
+        // Buffer should be completely filled
+        assertThat(destination.remaining()).isEqualTo(0);
+
+        // Flip to read mode and verify all chunks are present (not 
overwritten)
+        destination.flip();
+        byte[] result = new byte[16];
+        destination.get(result);
+
+        String resultString = new String(result, StandardCharsets.UTF_8);
+        assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+        // Verify each chunk individually
+        assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+        assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+        assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+        assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+    }
 }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
index d4c44cdc..ec487fc5 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
@@ -116,6 +116,7 @@ public class CdcRandomAccessReader extends 
RandomAccessReader
                     BlockingStreamConsumer streamConsumer = new 
BlockingStreamConsumer();
                     source.request(offset, end, streamConsumer);
                     streamConsumer.getBytes(buffer);
+                    buffer.flip();
                     return this;
                 }
 
@@ -127,6 +128,7 @@ public class CdcRandomAccessReader extends 
RandomAccessReader
 
                 inputStream.read(buffer);
                 assert buffer.remaining() == 0;
+                buffer.flip();
             }
             catch (IOException e)
             {
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
index 5f3858c9..28a9f902 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
@@ -119,13 +119,12 @@ public class CdcRandomAccessReaderTest
         assertThat(holder).isNotNull();
         assertThat(rebuffer.offset()).isEqualTo(0L);
 
-        // Verify buffer was fully written (position at end, remaining = 0)
+        // Verify buffer is in read mode (flipped by rebuffer)
         ByteBuffer buffer = holder.buffer();
-        assertThat(buffer.remaining()).isEqualTo(0);
-        assertThat(buffer.position()).isEqualTo(50);
+        assertThat(buffer.position()).isEqualTo(0);
+        assertThat(buffer.remaining()).isEqualTo(50);
 
-        // Flip to read mode and verify actual byte values
-        buffer.flip();
+        // Verify actual byte values (buffer already flipped, ready to read)
         for (int i = 0; i < 50; i++)
         {
             assertThat(buffer.get()).isEqualTo((byte) i);
@@ -136,13 +135,12 @@ public class CdcRandomAccessReaderTest
         assertThat(holder).isNotNull();
         assertThat(rebuffer.offset()).isEqualTo(50L);
 
-        // Verify buffer was fully written (position at end, remaining = 0)
+        // Verify buffer is in read mode (flipped by rebuffer)
         buffer = holder.buffer();
-        assertThat(buffer.remaining()).isEqualTo(0);
-        assertThat(buffer.position()).isEqualTo(50);
+        assertThat(buffer.position()).isEqualTo(0);
+        assertThat(buffer.remaining()).isEqualTo(50);
 
-        // Flip to read mode and verify actual byte values
-        buffer.flip();
+        // Verify actual byte values (buffer already flipped, ready to read)
         for (int i = 0; i < 50; i++)
         {
             assertThat(buffer.get()).isEqualTo((byte) (50 + i));
@@ -195,13 +193,12 @@ public class CdcRandomAccessReaderTest
         assertThat(holder).isNotNull();
         assertThat(rebuffer.offset()).isEqualTo(0L);
 
-        // Verify buffer is in write mode
+        // Verify buffer is in read mode (flipped by rebuffer)
         ByteBuffer buffer = holder.buffer();
-        assertThat(buffer.remaining()).isEqualTo(0);
-        assertThat(buffer.position()).isEqualTo(50);
+        assertThat(buffer.position()).isEqualTo(0);
+        assertThat(buffer.remaining()).isEqualTo(50);
 
-        // TEST flips to verify byte values are correct
-        buffer.flip();
+        // Verify byte values are correct (buffer already flipped, ready to 
read)
         for (int i = 0; i < 50; i++)
         {
             assertThat(buffer.get()).isEqualTo((byte) i);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to