This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6063d0e9 CASSSIDECAR-405: Fix ByteBuffer flip() in
StreamBuffer.copyBytes() causing data corruption (#314)
6063d0e9 is described below
commit 6063d0e90e3add43a7ea7d9cc8f89092adf7e831
Author: Jyothsna konisa <[email protected]>
AuthorDate: Tue Feb 10 14:57:31 2026 -0800
CASSSIDECAR-405: Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing
data corruption (#314)
Patch by Jyothsna Konisa; Reviewed by Yifan Cai and Bernardo Botella for
CASSSIDECAR-405
---
.../cassandra/sidecar/client/StreamBuffer.java | 9 ++--
.../cassandra/sidecar/client/StreamBufferTest.java | 54 ++++++++++++++++++++--
.../sidecar/client/VertxStreamBuffer.java | 1 -
.../sidecar/client/VertxStreamBufferTest.java | 54 ++++++++++++++++++++--
4 files changed, 106 insertions(+), 12 deletions(-)
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
index d5e7e9c9..e1786baf 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/StreamBuffer.java
@@ -27,11 +27,15 @@ import java.nio.ByteBuffer;
public interface StreamBuffer
{
/**
- * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer
destination}
+ * Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer
destination}.
+ * <p>
+ * This method writes {@code length} bytes starting at the destination
buffer's current position
+ * and advances the position by {@code length}. The caller is responsible
for calling
+ * {@link ByteBuffer#flip()} on the destination buffer before reading from
it.
*
* @param sourceOffset the offset within the {@link StreamBuffer} to be
read; must be non-negative and
* larger than the buffer length
- * @param destination a {@link ByteBuffer} where the data will be copied
+ * @param destination a {@link ByteBuffer} where the data will be copied
at its current position
* @param length the number of bytes to be copied from the {@link
StreamBuffer}; must be non-negative and
* larger than the {@code buffer.length - sourceOffset}
*/
@@ -94,7 +98,6 @@ public interface StreamBuffer
public void copyBytes(int sourceOffset, ByteBuffer destination, int
length)
{
destination.put(bytes, sourceOffset, length);
- destination.flip();
}
@Override
diff --git
a/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
b/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
index 253fd2d2..ec1dd79f 100644
---
a/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
+++
b/client/src/test/java/org/apache/cassandra/sidecar/client/StreamBufferTest.java
@@ -67,8 +67,9 @@ class StreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 11);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(11);
+ assertThat(destination.position()).isEqualTo(11);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello
World");
@@ -81,8 +82,9 @@ class StreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 5);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(5);
+ assertThat(destination.position()).isEqualTo(5);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -98,4 +100,48 @@ class StreamBufferTest
assertThat(streamBuffer.getByte(i)).isEqualTo((byte)
inputString.charAt(i));
}
}
+
+ @Test
+ void testMultipleChunkCopyToSameBuffer()
+ {
+ // Simulates BufferingInputStream filling a buffer with multiple chunks
+ // This validates the fix for the flip() bug where chunks were
overwriting instead of appending
+
+ // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC",
"DDDD"
+ String sourceData = "AAAABBBBCCCCDDDD";
+ StreamBuffer streamBuffer =
StreamBuffer.wrap(sourceData.getBytes(StandardCharsets.UTF_8));
+
+ // Destination buffer to accumulate all chunks (16 bytes total)
+ ByteBuffer destination = ByteBuffer.allocate(16);
+
+ // Write chunks one at a time (simulating multi-chunk read)
+ streamBuffer.copyBytes(0, destination, 4); // Write "AAAA" at
position 0
+ assertThat(destination.position()).isEqualTo(4); // Position should
advance to 4
+
+ streamBuffer.copyBytes(4, destination, 4); // Write "BBBB" at
position 4
+ assertThat(destination.position()).isEqualTo(8); // Position should
advance to 8
+
+ streamBuffer.copyBytes(8, destination, 4); // Write "CCCC" at
position 8
+ assertThat(destination.position()).isEqualTo(12); // Position should
advance to 12
+
+ streamBuffer.copyBytes(12, destination, 4); // Write "DDDD" at
position 12
+ assertThat(destination.position()).isEqualTo(16); // Position should
advance to 16
+
+ // Buffer should be completely filled
+ assertThat(destination.remaining()).isEqualTo(0);
+
+ // Flip to read mode and verify all chunks are present (not
overwritten)
+ destination.flip();
+ byte[] result = new byte[16];
+ destination.get(result);
+
+ String resultString = new String(result, StandardCharsets.UTF_8);
+ assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+ // Verify each chunk individually
+ assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+ assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+ assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+ assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+ }
}
diff --git
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
index 8a194852..94ec87ad 100644
---
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
+++
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxStreamBuffer.java
@@ -46,7 +46,6 @@ public class VertxStreamBuffer implements StreamBuffer
public void copyBytes(int sourceOffset, ByteBuffer destination, int length)
{
destination.put(buffer.getBytes(sourceOffset, sourceOffset + length));
- destination.flip();
}
/**
diff --git
a/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
index eb743c9f..d0e1b5e7 100644
---
a/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
+++
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxStreamBufferTest.java
@@ -41,8 +41,9 @@ class VertxStreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 11);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(11);
+ assertThat(destination.position()).isEqualTo(11);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello
World");
@@ -56,8 +57,9 @@ class VertxStreamBufferTest
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 5);
assertThat(destination.hasArray()).isTrue();
- assertThat(destination.position()).isEqualTo(0);
- assertThat(destination.limit()).isEqualTo(5);
+ assertThat(destination.position()).isEqualTo(5);
+ assertThat(destination.limit()).isEqualTo(20);
+ destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
@@ -93,4 +95,48 @@ class VertxStreamBufferTest
assertThat(streamBuffer.readableBytes()).isEqualTo(8);
streamBuffer.release(); // does nothing
}
+
+ @Test
+ void testMultipleChunkCopyToSameBuffer()
+ {
+ // Simulates BufferingInputStream filling a buffer with multiple chunks
+ // This validates the fix for the flip() bug where chunks were
overwriting instead of appending
+
+ // Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC",
"DDDD"
+ Buffer buffer = new BufferImpl().appendString("AAAABBBBCCCCDDDD");
+ StreamBuffer streamBuffer = new VertxStreamBuffer(buffer);
+
+ // Destination buffer to accumulate all chunks (16 bytes total)
+ ByteBuffer destination = ByteBuffer.allocate(16);
+
+ // Write chunks one at a time (simulating multi-chunk read)
+ streamBuffer.copyBytes(0, destination, 4); // Write "AAAA" at
position 0
+ assertThat(destination.position()).isEqualTo(4); // Position should
advance to 4
+
+ streamBuffer.copyBytes(4, destination, 4); // Write "BBBB" at
position 4
+ assertThat(destination.position()).isEqualTo(8); // Position should
advance to 8
+
+ streamBuffer.copyBytes(8, destination, 4); // Write "CCCC" at
position 8
+ assertThat(destination.position()).isEqualTo(12); // Position should
advance to 12
+
+ streamBuffer.copyBytes(12, destination, 4); // Write "DDDD" at
position 12
+ assertThat(destination.position()).isEqualTo(16); // Position should
advance to 16
+
+ // Buffer should be completely filled
+ assertThat(destination.remaining()).isEqualTo(0);
+
+ // Flip to read mode and verify all chunks are present (not
overwritten)
+ destination.flip();
+ byte[] result = new byte[16];
+ destination.get(result);
+
+ String resultString = new String(result, StandardCharsets.UTF_8);
+ assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");
+
+ // Verify each chunk individually
+ assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
+ assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
+ assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
+ assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]