This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new be1c5ce  [fix] fixed largeint type reads to adapt to Doris 2.0 (#173)
be1c5ce is described below

commit be1c5cee79070c0e360918d4a410cafff13e7625
Author: zy-kkk <[email protected]>
AuthorDate: Tue Aug 8 17:51:08 2023 +0800

    [fix] fixed largeint type reads to adapt to Doris 2.0 (#173)
---
 .../apache/doris/flink/serialization/RowBatch.java | 33 ++++++++++++++++++++++
 .../org/apache/doris/flink/DorisSourceExample.java |  3 +-
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index a3564d1..de63a6e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -22,6 +22,7 @@ import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
@@ -276,6 +278,37 @@ public class RowBatch {
                 addValueToRow(rowIndex, parse);
                 break;
             case "LARGEINT":
+                if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY) &&
+                        !minorType.equals(Types.MinorType.VARCHAR)) return 
false;
+                if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) {
+                    FixedSizeBinaryVector largeIntVector = 
(FixedSizeBinaryVector) fieldVector;
+                    if (largeIntVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
+                        break;
+                    }
+                    byte[] bytes = largeIntVector.get(rowIndex);
+                    int left = 0, right = bytes.length - 1;
+                    while (left < right) {
+                        byte temp = bytes[left];
+                        bytes[left] = bytes[right];
+                        bytes[right] = temp;
+                        left++;
+                        right--;
+                    }
+                    BigInteger largeInt = new BigInteger(bytes);
+                    addValueToRow(rowIndex, largeInt);
+                    break;
+                } else {
+                    VarCharVector largeIntVector = (VarCharVector) fieldVector;
+                    if (largeIntVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
+                        break;
+                    }
+                    stringValue = new String(largeIntVector.get(rowIndex));
+                    BigInteger largeInt = new BigInteger(stringValue);
+                    addValueToRow(rowIndex, largeInt);
+                    break;
+                }
             case "CHAR":
             case "VARCHAR":
             case "STRING":
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
index 35857dc..d4e472e 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
 
 public class DorisSourceExample {
 
@@ -58,7 +57,7 @@ public class DorisSourceExample {
         final Table result = tEnv.sqlQuery("SELECT * from doris_source  ");
 
         // print the result to the console
-        tEnv.toRetractStream(result, Row.class).print();
+        tEnv.toDataStream(result).print();
         env.execute();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to