This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a396015109 [arrow] Fix Arrow to Paimon vector conversion casts (#8138)
a396015109 is described below
commit a39601510945c2baf0b75bf9d80a99a50eb0e0f5
Author: yuxia Luo <[email protected]>
AuthorDate: Sat Jun 6 16:38:02 2026 +0800
[arrow] Fix Arrow to Paimon vector conversion casts (#8138)
---
.../converter/Arrow2PaimonVectorConverter.java | 63 ++++++++++++++--------
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 55 +++++++++++++++++++
2 files changed, 95 insertions(+), 23 deletions(-)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
index e1fe66883a..ce0d067e63 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
@@ -82,7 +82,10 @@ import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
@@ -195,7 +198,12 @@ public interface Arrow2PaimonVectorConverter {
@Override
public Bytes getBytes(int index) {
- byte[] bytes = ((VarBinaryVector)
vector).getObject(index);
+ byte[] bytes;
+ if (vector instanceof FixedSizeBinaryVector) {
+ bytes = ((FixedSizeBinaryVector)
vector).get(index);
+ } else {
+ bytes = ((VarBinaryVector)
vector).getObject(index);
+ }
return new Bytes(bytes, 0, bytes.length) {
@Override
public byte[] getBytes() {
@@ -378,7 +386,7 @@ public interface Arrow2PaimonVectorConverter {
@Override
public int getInt(int index) {
- return ((TimeMilliVector) vector).get(index);
+ return getTimeInMillis(vector, index);
}
};
}
@@ -396,16 +404,7 @@ public interface Arrow2PaimonVectorConverter {
@Override
public Timestamp getTimestamp(int i, int precision) {
long value = ((TimeStampVector) vector).get(i);
- if (precision == 0) {
- return Timestamp.fromEpochMillis(value * 1000);
- } else if (precision >= 1 && precision <= 3) {
- return Timestamp.fromEpochMillis(value);
- } else if (precision >= 4 && precision <= 6) {
- return Timestamp.fromMicros(value);
- } else {
- return Timestamp.fromEpochMillis(
- value / 1_000_000, (int) (value %
1_000_000));
- }
+ return convertEpochToTimestamp(value, precision);
}
};
}
@@ -422,21 +421,39 @@ public interface Arrow2PaimonVectorConverter {
@Override
public Timestamp getTimestamp(int i, int precision) {
- long value = (long) vector.getObject(i);
- if (precision == 0) {
- return Timestamp.fromEpochMillis(value * 1000);
- } else if (precision >= 1 && precision <= 3) {
- return Timestamp.fromEpochMillis(value);
- } else if (precision >= 4 && precision <= 6) {
- return Timestamp.fromMicros(value);
- } else {
- return Timestamp.fromEpochMillis(
- value / 1_000_000, (int) (value %
1_000_000));
- }
+ long value = ((TimeStampVector) vector).get(i);
+ return convertEpochToTimestamp(value, precision);
}
};
}
+ private int getTimeInMillis(FieldVector vector, int index) {
+ if (vector instanceof TimeMilliVector) {
+ return ((TimeMilliVector) vector).get(index);
+ } else if (vector instanceof TimeMicroVector) {
+ return (int) (((TimeMicroVector) vector).get(index) / 1_000);
+ } else if (vector instanceof TimeNanoVector) {
+ return (int) (((TimeNanoVector) vector).get(index) /
1_000_000);
+ } else if (vector instanceof TimeSecVector) {
+ return ((TimeSecVector) vector).get(index) * 1_000;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported Arrow time vector: " +
vector.getClass().getName());
+ }
+ }
+
+ private Timestamp convertEpochToTimestamp(long value, int precision) {
+ if (precision == 0) {
+ return Timestamp.fromEpochMillis(value * 1000);
+ } else if (precision >= 1 && precision <= 3) {
+ return Timestamp.fromEpochMillis(value);
+ } else if (precision >= 4 && precision <= 6) {
+ return Timestamp.fromMicros(value);
+ } else {
+ return Timestamp.fromEpochMillis(value / 1_000_000, (int)
(value % 1_000_000));
+ }
+ }
+
@Override
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
throw new UnsupportedOperationException();
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index 5e7dc95395..9d102c9ce2 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -44,7 +44,11 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -401,6 +405,57 @@ public class ArrowFormatWriterTest {
}
}
+ @Test
+ public void testArrowBundleRecordsWithTimeAndFixedBinaryVectors() {
+ // Arrow batches from external producers may use these vector types
even though Paimon's
+ // Arrow writer currently defaults to TimeMilliVector and
VarBinaryVector.
+ RowType rowType =
+ RowType.of(
+ new DataField(0, "time_sec", DataTypes.TIME(0)),
+ new DataField(1, "time_micro", DataTypes.TIME(6)),
+ new DataField(2, "time_nano", DataTypes.TIME(9)),
+ new DataField(3, "fixed_binary", DataTypes.BINARY(3)));
+
+ try (RootAllocator allocator = new RootAllocator()) {
+ TimeSecVector timeSecVector = new TimeSecVector("time_sec",
allocator);
+ timeSecVector.allocateNew(1);
+ timeSecVector.setSafe(0, 12);
+ timeSecVector.setValueCount(1);
+
+ TimeMicroVector timeMicroVector = new
TimeMicroVector("time_micro", allocator);
+ timeMicroVector.allocateNew(1);
+ timeMicroVector.setSafe(0, 12345678L);
+ timeMicroVector.setValueCount(1);
+
+ TimeNanoVector timeNanoVector = new TimeNanoVector("time_nano",
allocator);
+ timeNanoVector.allocateNew(1);
+ timeNanoVector.setSafe(0, 12345678901L);
+ timeNanoVector.setValueCount(1);
+
+ byte[] binary = new byte[] {1, 2, 3};
+ FixedSizeBinaryVector fixedBinaryVector =
+ new FixedSizeBinaryVector("fixed_binary", allocator,
binary.length);
+ fixedBinaryVector.allocateNew(1);
+ fixedBinaryVector.setSafe(0, binary);
+ fixedBinaryVector.setValueCount(1);
+
+ List<FieldVector> vectors =
+ Arrays.asList(
+ timeSecVector, timeMicroVector, timeNanoVector,
fixedBinaryVector);
+ try (VectorSchemaRoot vectorSchemaRoot = new
VectorSchemaRoot(vectors)) {
+ vectorSchemaRoot.setRowCount(1);
+
+ Iterator<InternalRow> iterator =
+ new ArrowBundleRecords(vectorSchemaRoot, rowType,
true).iterator();
+ InternalRow row = iterator.next();
+ assertThat(row.getInt(0)).isEqualTo(12000);
+ assertThat(row.getInt(1)).isEqualTo(12345);
+ assertThat(row.getInt(2)).isEqualTo(12345);
+ assertThat(row.getBinary(3)).containsExactly(binary);
+ }
+ }
+ }
+
@Test
public void testCWriter() {
try (ArrowFormatCWriter writer = new
ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true)) {