This is an automated email from the ASF dual-hosted git repository.
leo65535 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d310ea4 [Bug][connector] Fix flink FileSink json format unsupports
some atomic type which is not a basic type (#1334)
d310ea4 is described below
commit d310ea42b776db3b98533a1d43ac371e0406ea71
Author: kalencaya <[email protected]>
AuthorDate: Tue Mar 1 19:19:18 2022 +0800
[Bug][connector] Fix flink FileSink json format unsupports some atomic type
which is not a basic type (#1334)
* fix: JsonRowOutputFormat supports java.sql.Date Time or Timestamp type.
* fix: JsonRowOutputFormat supports java.sql.Date Time or Timestamp type.
* fix: remove useless import.
Co-authored-by: wangqi <[email protected]>
---
.../main/java/org/apache/seatunnel/flink/sink/JsonRowOutputFormat.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/JsonRowOutputFormat.java
b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/JsonRowOutputFormat.java
index 99a68a8..0201d4d 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/JsonRowOutputFormat.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/JsonRowOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.flink.sink;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -98,7 +99,7 @@ public class JsonRowOutputFormat extends
FileOutputFormat<Row> {
for (String name : fieldNames) {
Object field = record.getField(i);
final TypeInformation type = rowTypeInfo.getTypeAt(i);
- if (type.isBasicType()) {
+ if (type instanceof AtomicType) {
json.put(name, field);
} else if (type instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo arrayTypeInfo = (ObjectArrayTypeInfo) type;