This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 018250af CASSANALYTICS-116 Fix ByteBuffer flip() in
StreamBuffer.copyBytes() causing data corruption (#165)
018250af is described below
commit 018250aff4b74a8296a66285ad66aa591b52afea
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Feb 9 13:15:01 2026 -0800
CASSANALYTICS-116 Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing
data corruption (#165)
Patch by Jyothsna Konisa; Reviewed by Yifan Cai and Bernardo Botella for
CASSANALYTICS-116
---
CHANGES.txt | 1 +
.../cassandra/sidecar/client/StreamBuffer.java | 9 ++--
.../cassandra/sidecar/client/StreamBufferTest.java | 54 ++++++++++++++++++++--
.../sidecar/client/VertxStreamBuffer.java | 1 -
.../sidecar/client/VertxStreamBufferTest.java | 54 ++++++++++++++++++++--
.../cassandra/io/util/CdcRandomAccessReader.java | 2 +
.../io/util/CdcRandomAccessReaderTest.java | 27 +++++------
7 files changed, 121 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 94e62135..caf58b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption
(CASSANALYTICS-116)
* Fix race condition in DirectStreamSession#onSSTablesProduced and
SortedSStableWriter#close (CASSANALYTICS-107)
* Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
* Add TimeRangeFilter to filter out SSTables outside given time window
(CASSANALYTICS-102)
diff --git
a/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
b/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
index d5e7e9c9..4664c040 100644
---
a/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
+++
b/analytics-sidecar-client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
@@ -27,11 +27,15 @@ import java.nio.ByteBuffer;
public interface StreamBuffer
{
/**
- * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer
destination}
+ * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer
destination}.
+ * <p>
+ * This method writes {@code length} bytes starting at the destination
buffer's current position
+ * and advances the position by {@code length}. The caller is responsible
for calling
+ * {@link ByteBuffer#flip()} on the destination buffer before reading from
it.
*
* @param sourceOffset the offset within the {@link StreamBuffer} to be
read; must be non-negative and
* larger than the buffer length
- * @param destination a {@link ByteBuffer} where the data will be copied
+ * @param destination a {@link ByteBuffer} where the data will be copied
at its current position
* @param length the number of bytes to be copied from the {@link
StreamBuffer}; must be non-negative and
* larger than the {@code buffer.length - sourceOffset}
*/
@@ -94,7 +98,6 @@ public interface StreamBuffer
public void copyBytes(int sourceOffset, ByteBuffer destination, int
length)
{
destination.put(bytes, sourceOffset, length);
- destination.flip();
}
@Override
diff --git
a/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
b/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
index 253fd2d2..ec1dd79f 100644
---
a/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
+++
b/analytics-sidecar-client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
@@ -67,8 +67,9 @@ class StreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 11);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(11);
+ assertThat(destination.position()).isEqualTo(11);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello
World");
@@ -81,8 +82,9 @@ class StreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 5);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(5);
+ assertThat(destination.position()).isEqualTo(5);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -98,4 +100,48 @@ class StreamBufferTest
assertThat(streamBuffer.getByte(i)).isEqualTo((byte)
inputString.charAt(i));
}
}
+
+ @Test
+ void testMultipleChunkCopyToSameBuffer()
+ {
+ // Simulates BufferingInputStream filling a buffer with multiple chunks
+ // This validates the fix for the flip() bug where chunks were
overwriting instead of appending
+
+ // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC",
"DDDD"
+ String sourceData = "AAAABBBBCCCCDDDD";
+ StreamBuffer streamBuffer =
StreamBuffer.wrap(sourceData.getBytes(StandardCharsets.UTF_8));
+
+ // Destination buffer to accumulate all chunks (16 bytes total)
+ ByteBuffer destination = ByteBuffer.allocate(16);
+
+ // Write chunks one at a time (simulating multi-chunk read)
+ streamBuffer.copyBytes(0, destination, 4); // Write "AAAA" at
position 0
+ assertThat(destination.position()).isEqualTo(4); // Position should
advance to 4
+
+ streamBuffer.copyBytes(4, destination, 4); // Write "BBBB" at
position 4
+ assertThat(destination.position()).isEqualTo(8); // Position should
advance to 8
+
+ streamBuffer.copyBytes(8, destination, 4); // Write "CCCC" at
position 8
+ assertThat(destination.position()).isEqualTo(12); // Position should
advance to 12
+
+ streamBuffer.copyBytes(12, destination, 4); // Write "DDDD" at
position 12
+ assertThat(destination.position()).isEqualTo(16); // Position should
advance to 16
+
+ // Buffer should be completely filled
+ assertThat(destination.remaining()).isEqualTo(0);
+
+ // Flip to read mode and verify all chunks are present (not
overwritten)
+ destination.flip();
+ byte[] result = new byte[16];
+ destination.get(result);
+
+ String resultString = new String(result, StandardCharsets.UTF_8);
+ assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+ // Verify each chunk individually
+ assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+ assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+ assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+ assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+ }
}
diff --git
a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
index 8a194852..94ec87ad 100644
---
a/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
+++
b/analytics-sidecar-vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
@@ -46,7 +46,6 @@ public class VertxStreamBuffer implements StreamBuffer
public void copyBytes(int sourceOffset, ByteBuffer destination, int length)
{
destination.put(buffer.getBytes(sourceOffset, sourceOffset + length));
- destination.flip();
}
/**
diff --git
a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
index eb743c9f..d0e1b5e7 100644
---
a/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
+++
b/analytics-sidecar-vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
@@ -41,8 +41,9 @@ class VertxStreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 11);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(11);
+ assertThat(destination.position()).isEqualTo(11);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello
World");
@@ -56,8 +57,9 @@ class VertxStreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 5);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(5);
+ assertThat(destination.position()).isEqualTo(5);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -93,4 +95,48 @@ class VertxStreamBufferTest
assertThat(streamBuffer.readableBytes()).isEqualTo(8);
streamBuffer.release(); // does nothing
}
+
+ @Test
+ void testMultipleChunkCopyToSameBuffer()
+ {
+ // Simulates BufferingInputStream filling a buffer with multiple chunks
+ // This validates the fix for the flip() bug where chunks were
overwriting instead of appending
+
+ // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC",
"DDDD"
+ Buffer buffer = new BufferImpl().appendString("AAAABBBBCCCCDDDD");
+ StreamBuffer streamBuffer = new VertxStreamBuffer(buffer);
+
+ // Destination buffer to accumulate all chunks (16 bytes total)
+ ByteBuffer destination = ByteBuffer.allocate(16);
+
+ // Write chunks one at a time (simulating multi-chunk read)
+ streamBuffer.copyBytes(0, destination, 4); // Write "AAAA" at
position 0
+ assertThat(destination.position()).isEqualTo(4); // Position should
advance to 4
+
+ streamBuffer.copyBytes(4, destination, 4); // Write "BBBB" at
position 4
+ assertThat(destination.position()).isEqualTo(8); // Position should
advance to 8
+
+ streamBuffer.copyBytes(8, destination, 4); // Write "CCCC" at
position 8
+ assertThat(destination.position()).isEqualTo(12); // Position should
advance to 12
+
+ streamBuffer.copyBytes(12, destination, 4); // Write "DDDD" at
position 12
+ assertThat(destination.position()).isEqualTo(16); // Position should
advance to 16
+
+ // Buffer should be completely filled
+ assertThat(destination.remaining()).isEqualTo(0);
+
+ // Flip to read mode and verify all chunks are present (not
overwritten)
+ destination.flip();
+ byte[] result = new byte[16];
+ destination.get(result);
+
+ String resultString = new String(result, StandardCharsets.UTF_8);
+ assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+ // Verify each chunk individually
+ assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+ assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+ assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+ assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+ }
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
index d4c44cdc..ec487fc5 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
@@ -116,6 +116,7 @@ public class CdcRandomAccessReader extends
RandomAccessReader
BlockingStreamConsumer streamConsumer = new
BlockingStreamConsumer();
source.request(offset, end, streamConsumer);
streamConsumer.getBytes(buffer);
+ buffer.flip();
return this;
}
@@ -127,6 +128,7 @@ public class CdcRandomAccessReader extends
RandomAccessReader
inputStream.read(buffer);
assert buffer.remaining() == 0;
+ buffer.flip();
}
catch (IOException e)
{
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
index 5f3858c9..28a9f902 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
@@ -119,13 +119,12 @@ public class CdcRandomAccessReaderTest
assertThat(holder).isNotNull();
assertThat(rebuffer.offset()).isEqualTo(0L);
- // Verify buffer was fully written (position at end, remaining = 0)
+ // Verify buffer is in read mode (flipped by rebuffer)
ByteBuffer buffer = holder.buffer();
- assertThat(buffer.remaining()).isEqualTo(0);
- assertThat(buffer.position()).isEqualTo(50);
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.remaining()).isEqualTo(50);
- // Flip to read mode and verify actual byte values
- buffer.flip();
+ // Verify actual byte values (buffer already flipped, ready to read)
for (int i = 0; i < 50; i++)
{
assertThat(buffer.get()).isEqualTo((byte) i);
@@ -136,13 +135,12 @@ public class CdcRandomAccessReaderTest
assertThat(holder).isNotNull();
assertThat(rebuffer.offset()).isEqualTo(50L);
- // Verify buffer was fully written (position at end, remaining = 0)
+ // Verify buffer is in read mode (flipped by rebuffer)
buffer = holder.buffer();
- assertThat(buffer.remaining()).isEqualTo(0);
- assertThat(buffer.position()).isEqualTo(50);
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.remaining()).isEqualTo(50);
- // Flip to read mode and verify actual byte values
- buffer.flip();
+ // Verify actual byte values (buffer already flipped, ready to read)
for (int i = 0; i < 50; i++)
{
assertThat(buffer.get()).isEqualTo((byte) (50 + i));
@@ -195,13 +193,12 @@ public class CdcRandomAccessReaderTest
assertThat(holder).isNotNull();
assertThat(rebuffer.offset()).isEqualTo(0L);
- // Verify buffer is in write mode
+ // Verify buffer is in read mode (flipped by rebuffer)
ByteBuffer buffer = holder.buffer();
- assertThat(buffer.remaining()).isEqualTo(0);
- assertThat(buffer.position()).isEqualTo(50);
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.remaining()).isEqualTo(50);
- // TEST flips to verify byte values are correct
- buffer.flip();
+ // Verify byte values are correct (buffer already flipped, ready to
read)
for (int i = 0; i < 50; i++)
{
assertThat(buffer.get()).isEqualTo((byte) i);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]