This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git
The following commit(s) were added to refs/heads/main by this push:
new c2234549 GH-729: [JDBC] Fix BinaryConsumer consuming null value (#730)
c2234549 is described below
commit c2234549827488d32b6698f91e26053a293d808e
Author: wangyunlai <[email protected]>
AuthorDate: Mon Apr 28 14:23:46 2025 +0800
GH-729: [JDBC] Fix BinaryConsumer consuming null value (#730)
## What's Changed
Set `startOffset` of the next item when `BinaryConsumer` consuming
`null` value.
Closes #729 .
---
.../adapter/jdbc/consumer/BinaryConsumer.java | 25 +++++++++++-----------
.../adapter/jdbc/consumer/BinaryConsumerTest.java | 11 +++++++++-
2 files changed, 23 insertions(+), 13 deletions(-)
diff --git
a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
index edbc6360..73ec04b8 100644
---
a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
+++
b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
@@ -51,13 +51,15 @@ public abstract class BinaryConsumer extends
BaseConsumer<VarBinaryVector> {
/** consume a InputStream. */
public void consume(InputStream is) throws IOException {
+ while (currentIndex >= vector.getValueCapacity()) {
+ vector.reallocValidityAndOffsetBuffers();
+ }
+
+ final int startOffset = vector.getStartOffset(currentIndex);
+ final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
+ int dataLength = 0;
+
if (is != null) {
- while (currentIndex >= vector.getValueCapacity()) {
- vector.reallocValidityAndOffsetBuffers();
- }
- final int startOffset = vector.getStartOffset(currentIndex);
- final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
- int dataLength = 0;
int read;
while ((read = is.read(reuseBytes)) != -1) {
while (vector.getDataBuffer().capacity() < (startOffset + dataLength +
read)) {
@@ -66,11 +68,12 @@ public abstract class BinaryConsumer extends
BaseConsumer<VarBinaryVector> {
vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes,
0, read);
dataLength += read;
}
- offsetBuffer.setInt(
- (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH),
startOffset + dataLength);
+
BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
- vector.setLastSet(currentIndex);
}
+ offsetBuffer.setInt(
+ (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH),
startOffset + dataLength);
+ vector.setLastSet(currentIndex);
}
public void moveWriterPosition() {
@@ -95,9 +98,7 @@ public abstract class BinaryConsumer extends
BaseConsumer<VarBinaryVector> {
@Override
public void consume(ResultSet resultSet) throws SQLException, IOException {
InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
- if (!resultSet.wasNull()) {
- consume(is);
- }
+ consume(is);
moveWriterPosition();
}
}
diff --git
a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
index b1e25379..bb836578 100644
---
a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
+++
b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.junit.jupiter.api.Test;
@@ -65,7 +66,11 @@ public class BinaryConsumerTest extends AbstractConsumerTest
{
nullable,
binaryConsumer -> {
for (byte[] value : values) {
- binaryConsumer.consume(new ByteArrayInputStream(value));
+ if (value != null) {
+ binaryConsumer.consume(new ByteArrayInputStream(value));
+ } else {
+ binaryConsumer.consume((InputStream) null);
+ }
binaryConsumer.moveWriterPosition();
}
},
@@ -119,5 +124,9 @@ public class BinaryConsumerTest extends
AbstractConsumerTest {
testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT);
}
testConsumeInputStream(testRecords, false);
+
+ byte[] bytes1 = new byte[] {1, 2, 3};
+ byte[] bytes2 = new byte[] {4, 5, 6};
+ testConsumeInputStream(new byte[][] {bytes1, null, bytes2}, true);
}
}