This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new bc136eee [Feature](type) support doris decimla256 type (#628)
bc136eee is described below

commit bc136eeec4dd1ebac54355a752701e763d7efd76
Author: wudi <[email protected]>
AuthorDate: Mon Jan 5 19:12:36 2026 +0800

    [Feature](type) support doris decimla256 type (#628)
---
 .../apache/doris/flink/serialization/RowBatch.java | 17 ++++-
 .../doris/flink/serialization/TestRowBatch.java    | 79 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index f012b9c6..21a42614 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -24,6 +24,7 @@ import org.apache.arrow.vector.BaseIntVector;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Decimal256Vector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -219,6 +220,7 @@ public class RowBatch {
                 FieldVector fieldVector = fieldVectors.get(col);
                 MinorType minorType = fieldVector.getMinorType();
                 final String currentType = schema.get(col).getType();
+                final String colName = schema.get(col).getName();
                 for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
                     boolean passed = doConvert(col, rowIndex, minorType, 
currentType, fieldVector);
                     if (!passed) {
@@ -227,7 +229,8 @@ public class RowBatch {
                                         + currentType
                                         + ", but arrow type is "
                                         + minorType.name()
-                                        + ".");
+                                        + ", column Name is "
+                                        + colName);
                     }
                 }
             }
@@ -360,6 +363,18 @@ public class RowBatch {
                 BigDecimal value = 
decimalVector.getObject(rowIndex).stripTrailingZeros();
                 addValueToRow(rowIndex, value);
                 break;
+            case "DECIMAL256":
+                if (!minorType.equals(MinorType.DECIMAL256)) {
+                    return false;
+                }
+                Decimal256Vector decimal256Vector = (Decimal256Vector) 
fieldVector;
+                if (decimal256Vector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                BigDecimal value256 = 
decimal256Vector.getObject(rowIndex).stripTrailingZeros();
+                addValueToRow(rowIndex, value256);
+                break;
             case "DATE":
             case "DATEV2":
                 if (!minorType.equals(MinorType.DATEDAY) && 
!minorType.equals(MinorType.VARCHAR)) {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index b13cfe15..a740b2ad 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -26,6 +26,7 @@ import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Decimal256Vector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -486,6 +487,84 @@ public class TestRowBatch {
         rowBatch.next();
     }
 
+    @Test
+    public void testDecimal256() throws Exception {
+        List<Field> childrenBuilder = new ArrayList<>();
+        childrenBuilder.add(
+                new Field("k8", FieldType.nullable(new ArrowType.Decimal(38, 
18, 256)), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new 
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(3);
+
+        FieldVector vector = root.getVector("k8");
+        Decimal256Vector decimal256Vector = (Decimal256Vector) vector;
+        decimal256Vector.setInitialCapacity(3);
+        decimal256Vector.allocateNew();
+        decimal256Vector.setIndexDefined(0);
+        decimal256Vector.setSafe(0, new 
BigDecimal("123456789012345678.123456789012345678"));
+        decimal256Vector.setIndexDefined(1);
+        decimal256Vector.setSafe(1, new 
BigDecimal("987654321098765432.987654321098765432"));
+        decimal256Vector.setIndexDefined(2);
+        decimal256Vector.setSafe(2, new 
BigDecimal("111111111111111111.111111111111111111"));
+        vector.setValueCount(3);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":[{\"type\":\"DECIMAL256\",\"scale\": 18,"
+                        + "\"precision\": 38, 
\"name\":\"k8\",\"comment\":\"\"}], "
+                        + "\"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        Assert.assertEquals(
+                DecimalData.fromBigDecimal(
+                        new 
BigDecimal("123456789012345678.123456789012345678"), 38, 18),
+                DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 38, 
18));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow1 = rowBatch.next();
+        Assert.assertEquals(
+                DecimalData.fromBigDecimal(
+                        new 
BigDecimal("987654321098765432.987654321098765432"), 38, 18),
+                DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 38, 
18));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow2 = rowBatch.next();
+        Assert.assertEquals(
+                DecimalData.fromBigDecimal(
+                        new 
BigDecimal("111111111111111111.111111111111111111"), 38, 18),
+                DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 38, 
18));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
+
     @Test
     public void testMap() throws IOException, DorisException {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to