This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 
by this push:
     new b03fd055 [Pick] Pick flink's date data type (#591)
b03fd055 is described below

commit b03fd055b1d5ddd04af2557839d7dfc03e6b8ae9
Author: wudi <[email protected]>
AuthorDate: Mon May 12 15:02:04 2025 +0800

    [Pick] Pick flink's date data type (#591)
---
 .../org/apache/doris/flink/table/DorisDynamicOutputFormat.java   | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 03a26dbf..993dc651 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 import org.slf4j.Logger;
@@ -38,6 +39,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -80,6 +83,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     private final String[] fieldNames;
     private final boolean jsonFormat;
     private final RowData.FieldGetter[] fieldGetters;
+    private final LogicalType[] logicalTypes;
     private final List batch = new ArrayList<>();
     private long batchBytes = 0L;
     private String fieldDelimiter;
@@ -111,6 +115,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
 
 
         handleStreamloadProp();
+        this.logicalTypes = logicalTypes;
         this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
         for (int i = 0; i < logicalTypes.length; i++) {
             fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
@@ -244,6 +249,10 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
             StringJoiner value = new StringJoiner(this.fieldDelimiter);
             for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; 
++i) {
                 Object field = fieldGetters[i].getFieldOrNull(rowData);
+                // Compatible date types
+                if(field != null && 
LogicalTypeRoot.DATE.equals(logicalTypes[i].getTypeRoot())) {
+                    field = Date.valueOf(LocalDate.ofEpochDay((int) field));
+                }
                 if (jsonFormat) {
                     String data = field != null ? field.toString() : null;
                     valueMap.put(this.fieldNames[i], data);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to