This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new be1c5ce [fix] fixed largeint type reads to adapt to Doris 2.0 (#173)
be1c5ce is described below
commit be1c5cee79070c0e360918d4a410cafff13e7625
Author: zy-kkk <[email protected]>
AuthorDate: Tue Aug 8 17:51:08 2023 +0800
[fix] fixed largeint type reads to adapt to Doris 2.0 (#173)
---
.../apache/doris/flink/serialization/RowBatch.java | 33 ++++++++++++++++++++++
.../org/apache/doris/flink/DorisSourceExample.java | 3 +-
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index a3564d1..de63a6e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -22,6 +22,7 @@ import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@@ -276,6 +278,37 @@ public class RowBatch {
addValueToRow(rowIndex, parse);
break;
case "LARGEINT":
+ if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY) &&
+ !minorType.equals(Types.MinorType.VARCHAR)) return
false;
+ if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) {
+ FixedSizeBinaryVector largeIntVector =
(FixedSizeBinaryVector) fieldVector;
+ if (largeIntVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ byte[] bytes = largeIntVector.get(rowIndex);
+ int left = 0, right = bytes.length - 1;
+ while (left < right) {
+ byte temp = bytes[left];
+ bytes[left] = bytes[right];
+ bytes[right] = temp;
+ left++;
+ right--;
+ }
+ BigInteger largeInt = new BigInteger(bytes);
+ addValueToRow(rowIndex, largeInt);
+ break;
+ } else {
+ VarCharVector largeIntVector = (VarCharVector) fieldVector;
+ if (largeIntVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ stringValue = new String(largeIntVector.get(rowIndex));
+ BigInteger largeInt = new BigInteger(stringValue);
+ addValueToRow(rowIndex, largeInt);
+ break;
+ }
case "CHAR":
case "VARCHAR":
case "STRING":
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
index 35857dc..d4e472e 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
public class DorisSourceExample {
@@ -58,7 +57,7 @@ public class DorisSourceExample {
final Table result = tEnv.sqlQuery("SELECT * from doris_source ");
// print the result to the console
- tEnv.toRetractStream(result, Row.class).print();
+ tEnv.toDataStream(result).print();
env.execute();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]