This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new bc136eee [Feature](type) support doris decimla256 type (#628)
bc136eee is described below
commit bc136eeec4dd1ebac54355a752701e763d7efd76
Author: wudi <[email protected]>
AuthorDate: Mon Jan 5 19:12:36 2026 +0800
[Feature](type) support doris decimla256 type (#628)
---
.../apache/doris/flink/serialization/RowBatch.java | 17 ++++-
.../doris/flink/serialization/TestRowBatch.java | 79 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index f012b9c6..21a42614 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -24,6 +24,7 @@ import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -219,6 +220,7 @@ public class RowBatch {
FieldVector fieldVector = fieldVectors.get(col);
MinorType minorType = fieldVector.getMinorType();
final String currentType = schema.get(col).getType();
+ final String colName = schema.get(col).getName();
for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
boolean passed = doConvert(col, rowIndex, minorType,
currentType, fieldVector);
if (!passed) {
@@ -227,7 +229,8 @@ public class RowBatch {
+ currentType
+ ", but arrow type is "
+ minorType.name()
- + ".");
+ + ", column Name is "
+ + colName);
}
}
}
@@ -360,6 +363,18 @@ public class RowBatch {
BigDecimal value =
decimalVector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, value);
break;
+ case "DECIMAL256":
+ if (!minorType.equals(MinorType.DECIMAL256)) {
+ return false;
+ }
+ Decimal256Vector decimal256Vector = (Decimal256Vector)
fieldVector;
+ if (decimal256Vector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ BigDecimal value256 =
decimal256Vector.getObject(rowIndex).stripTrailingZeros();
+ addValueToRow(rowIndex, value256);
+ break;
case "DATE":
case "DATEV2":
if (!minorType.equals(MinorType.DATEDAY) &&
!minorType.equals(MinorType.VARCHAR)) {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index b13cfe15..a740b2ad 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -26,6 +26,7 @@ import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -486,6 +487,84 @@ public class TestRowBatch {
rowBatch.next();
}
+ @Test
+ public void testDecimal256() throws Exception {
+ List<Field> childrenBuilder = new ArrayList<>();
+ childrenBuilder.add(
+ new Field("k8", FieldType.nullable(new ArrowType.Decimal(38,
18, 256)), null));
+
+ VectorSchemaRoot root =
+ VectorSchemaRoot.create(
+ new
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
+ new RootAllocator(Integer.MAX_VALUE));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ArrowStreamWriter arrowStreamWriter =
+ new ArrowStreamWriter(
+ root, new DictionaryProvider.MapDictionaryProvider(),
outputStream);
+
+ arrowStreamWriter.start();
+ root.setRowCount(3);
+
+ FieldVector vector = root.getVector("k8");
+ Decimal256Vector decimal256Vector = (Decimal256Vector) vector;
+ decimal256Vector.setInitialCapacity(3);
+ decimal256Vector.allocateNew();
+ decimal256Vector.setIndexDefined(0);
+ decimal256Vector.setSafe(0, new
BigDecimal("123456789012345678.123456789012345678"));
+ decimal256Vector.setIndexDefined(1);
+ decimal256Vector.setSafe(1, new
BigDecimal("987654321098765432.987654321098765432"));
+ decimal256Vector.setIndexDefined(2);
+ decimal256Vector.setSafe(2, new
BigDecimal("111111111111111111.111111111111111111"));
+ vector.setValueCount(3);
+
+ arrowStreamWriter.writeBatch();
+
+ arrowStreamWriter.end();
+ arrowStreamWriter.close();
+
+ TStatus status = new TStatus();
+ status.setStatusCode(TStatusCode.OK);
+ TScanBatchResult scanBatchResult = new TScanBatchResult();
+ scanBatchResult.setStatus(status);
+ scanBatchResult.setEos(false);
+ scanBatchResult.setRows(outputStream.toByteArray());
+
+ String schemaStr =
+ "{\"properties\":[{\"type\":\"DECIMAL256\",\"scale\": 18,"
+ + "\"precision\": 38,
\"name\":\"k8\",\"comment\":\"\"}], "
+ + "\"status\":200}";
+
+ Schema schema = RestService.parseSchema(schemaStr, logger);
+
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow0 = rowBatch.next();
+ Assert.assertEquals(
+ DecimalData.fromBigDecimal(
+ new
BigDecimal("123456789012345678.123456789012345678"), 38, 18),
+ DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 38,
18));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow1 = rowBatch.next();
+ Assert.assertEquals(
+ DecimalData.fromBigDecimal(
+ new
BigDecimal("987654321098765432.987654321098765432"), 38, 18),
+ DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 38,
18));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow2 = rowBatch.next();
+ Assert.assertEquals(
+ DecimalData.fromBigDecimal(
+ new
BigDecimal("111111111111111111.111111111111111111"), 38, 18),
+ DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 38,
18));
+
+ Assert.assertFalse(rowBatch.hasNext());
+ thrown.expect(NoSuchElementException.class);
+ thrown.expectMessage(startsWith("Get row offset:"));
+ rowBatch.next();
+ }
+
@Test
public void testMap() throws IOException, DorisException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]