This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13
by this push:
new b03fd055 [Pick] Pick flink's date data type (#591)
b03fd055 is described below
commit b03fd055b1d5ddd04af2557839d7dfc03e6b8ae9
Author: wudi <[email protected]>
AuthorDate: Mon May 12 15:02:04 2025 +0800
[Pick] Pick flink's date data type (#591)
---
.../org/apache/doris/flink/table/DorisDynamicOutputFormat.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 03a26dbf..993dc651 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
@@ -38,6 +39,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -80,6 +83,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
private final String[] fieldNames;
private final boolean jsonFormat;
private final RowData.FieldGetter[] fieldGetters;
+ private final LogicalType[] logicalTypes;
private final List batch = new ArrayList<>();
private long batchBytes = 0L;
private String fieldDelimiter;
@@ -111,6 +115,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
handleStreamloadProp();
+ this.logicalTypes = logicalTypes;
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
@@ -244,6 +249,10 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
StringJoiner value = new StringJoiner(this.fieldDelimiter);
for (int i = 0; i < rowData.getArity() && i < fieldGetters.length;
++i) {
Object field = fieldGetters[i].getFieldOrNull(rowData);
+ // Compatible date types
+ if(field != null &&
LogicalTypeRoot.DATE.equals(logicalTypes[i].getTypeRoot())) {
+ field = Date.valueOf(LocalDate.ofEpochDay((int) field));
+ }
if (jsonFormat) {
String data = field != null ? field.toString() : null;
valueMap.put(this.fieldNames[i], data);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]