This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dd68c06b0 [Feature][Connector-V2] Add json file sink & json format
(#2385)
dd68c06b0 is described below
commit dd68c06b0accf1570c845dae833fee3c331a2b8a
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 10 09:54:04 2022 +0800
[Feature][Connector-V2] Add json file sink & json format (#2385)
* [Feature][Connector-V2] Add [File]JSON sink & json format
Co-authored-by: wanghailin <[email protected]>
---
docs/en/connector-v2/sink/HdfsFile.md | 2 +-
docs/en/connector-v2/sink/LocalFile.md | 2 +-
.../connector-file/connector-file-base/pom.xml | 5 +
.../seatunnel/file/config/FileFormat.java | 3 +-
.../hdfs/HdfsJsonTransactionStateFileWriter.java | 121 +++++++++++++
.../hdfs/HdfsTransactionStateFileWriteFactory.java | 13 ++
.../local/LocalJsonTransactionStateFileWriter.java | 122 +++++++++++++
.../LocalTransactionStateFileWriteFactory.java | 13 ++
.../e2e/flink/v2/file/FakeSourceToFileIT.java | 18 ++
.../resources/file/fakesource_to_hdfs_json.conf | 65 +++++++
.../resources/file/fakesource_to_local_json.conf | 65 +++++++
.../e2e/spark/v2/file/FakeSourceToFileIT.java | 19 +++
.../resources/file/fakesource_to_hdfs_json.conf | 64 +++++++
.../resources/file/fakesource_to_local_json.conf | 64 +++++++
.../format/json/JsonDeserializationSchema.java | 2 +-
.../seatunnel/format/json/JsonFormatFactory.java | 72 ++++++++
.../seatunnel/format/json/JsonFormatOptions.java | 36 ++++
.../format/json/JsonSerializationSchema.java | 65 +++++++
.../seatunnel/format/json/JsonToRowConverters.java | 2 +-
.../seatunnel/format/json/RowToJsonConverters.java | 188 +++++++++++++++++++++
.../apache/seatunnel/format/json/TimeFormat.java | 32 ++++
.../org.apache.seatunnel.api.table.factory.Factory | 18 ++
22 files changed, 986 insertions(+), 5 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index 2513d59c1..cbc2d616a 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -40,7 +40,7 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
We supported as the following file types:
-`text` `csv` `parquet` `orc`
+`text` `csv` `parquet` `orc` `json`
Please note that, The final file name will ends with the file_format's suffix,
the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index f7e820cc8..773c625af 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -38,7 +38,7 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
We supported as the following file types:
-`text` `csv` `parquet` `orc`
+`text` `csv` `parquet` `orc` `json`
Please note that, The final file name will ends with the file_format's suffix,
the suffix of the text file is `txt`.
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index b046c2278..05e98573e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -35,6 +35,11 @@
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index c02f58168..f9ca34409 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -23,7 +23,8 @@ public enum FileFormat implements Serializable {
CSV("csv"),
TEXT("txt"),
PARQUET("parquet"),
- ORC("orc");
+ ORC("orc"),
+ JSON("json");
private String suffix;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsJsonTransactionStateFileWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsJsonTransactionStateFileWriter.java
new file mode 100644
index 000000000..ec21990ee
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsJsonTransactionStateFileWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class HdfsJsonTransactionStateFileWriter extends
AbstractTransactionStateFileWriter {
+
+ private static final long serialVersionUID = -5432828969702531646L;
+
+ private final byte[] rowDelimiter;
+ private final SerializationSchema serializationSchema;
+ private Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+ public HdfsJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType
seaTunnelRowTypeInfo,
+ @NonNull
TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull
PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer>
sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem) {
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator,
partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId,
subTaskIndex, fileSystem);
+
+ this.rowDelimiter = rowDelimiter.getBytes();
+ this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowTypeInfo);
+ beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
+ try {
+ byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+ fsDataOutputStream.write(rowBytes);
+ fsDataOutputStream.write(rowDelimiter);
+ } catch (IOException e) {
+ log.error("write data to file {} error", filePath);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ beingWrittenOutputStream.entrySet().forEach(entry -> {
+ try {
+ entry.getValue().flush();
+ } catch (IOException e) {
+ log.error("error when flush file {}", entry.getKey());
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ entry.getValue().close();
+ } catch (IOException e) {
+ log.error("error when close output stream {}",
entry.getKey());
+ }
+ }
+
+ needMoveFiles.put(entry.getKey(),
getTargetLocation(entry.getKey()));
+ });
+ }
+
+ private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
+ if (fsDataOutputStream == null) {
+ try {
+ fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+ } catch (IOException e) {
+ log.error("can not get output file stream");
+ throw new RuntimeException(e);
+ }
+ }
+ return fsDataOutputStream;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
index f4c5cd840..1761b799d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
@@ -85,6 +85,19 @@ public class HdfsTransactionStateFileWriteFactory {
subTaskIndex,
fileSystem);
}
+ if (fileFormat.equals(FileFormat.JSON)) {
+ return new HdfsJsonTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ rowDelimiter,
+ fileSystem);
+ }
// if file type not supported by file connector, default txt writer
will be generated
return new HdfsTxtTransactionStateFileWriter(
seaTunnelRowTypeInfo,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalJsonTransactionStateFileWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalJsonTransactionStateFileWriter.java
new file mode 100644
index 000000000..e087bfff9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalJsonTransactionStateFileWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class LocalJsonTransactionStateFileWriter extends
AbstractTransactionStateFileWriter {
+
+ private static final long serialVersionUID = -3834472539886339383L;
+
+ private final byte[] rowDelimiter;
+ private final SerializationSchema serializationSchema;
+ private Map<String, FileOutputStream> beingWrittenOutputStream;
+
+ public LocalJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType
seaTunnelRowTypeInfo,
+ @NonNull
TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull
PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer>
sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem)
{
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator,
partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId,
subTaskIndex, fileSystem);
+
+ this.rowDelimiter = rowDelimiter.getBytes();
+ this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowTypeInfo);
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ FileOutputStream fileOutputStream = getOrCreateOutputStream(filePath);
+ try {
+ byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+ fileOutputStream.write(rowBytes);
+ fileOutputStream.write(rowDelimiter);
+ } catch (IOException e) {
+ log.error("write data to file {} error", filePath);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ beingWrittenOutputStream.entrySet().forEach(entry -> {
+ try {
+ entry.getValue().flush();
+ } catch (IOException e) {
+ log.error("error when flush file {}", entry.getKey());
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ entry.getValue().close();
+ } catch (IOException e) {
+ log.error("error when close output stream {}",
entry.getKey());
+ }
+ }
+
+ needMoveFiles.put(entry.getKey(),
getTargetLocation(entry.getKey()));
+ });
+ }
+
+ private FileOutputStream getOrCreateOutputStream(@NonNull String filePath)
{
+ FileOutputStream fileOutputStream =
beingWrittenOutputStream.get(filePath);
+ if (fileOutputStream == null) {
+ try {
+ FileUtils.createFile(filePath);
+ fileOutputStream = new FileOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, fileOutputStream);
+ } catch (IOException e) {
+ log.error("can not get output file stream");
+ throw new RuntimeException(e);
+ }
+ }
+ return fileOutputStream;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
index 05e92d5fd..2efab19b7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
@@ -85,6 +85,19 @@ public class LocalTransactionStateFileWriteFactory {
subTaskIndex,
fileSystem);
}
+ if (fileFormat.equals(FileFormat.JSON)) {
+ return new LocalJsonTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ rowDelimiter,
+ fileSystem);
+ }
// if file type not supported by file connector, default txt writer
will be generated
return new LocalTxtTransactionStateFileWriter(
seaTunnelRowTypeInfo,
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
index 9ac33002e..50d7c34d2 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
@@ -45,6 +45,15 @@ public class FakeSourceToFileIT extends FlinkContainer {
Assertions.assertEquals(0, execResult.getExitCode());
}
+ /**
+ * fake source -> local json file sink
+ */
+ @Test
+ public void testFakeSourceToLocalFileJson() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/file/fakesource_to_local_json.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
/**
* fake source -> hdfs text file sink
*/
@@ -62,4 +71,13 @@ public class FakeSourceToFileIT extends FlinkContainer {
Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_parquet.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ /**
+ * fake source -> hdfs json file sink
+ */
+ @Test
+ public void testFakeSourceToHdfsFileJson() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_json.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
new file mode 100644
index 000000000..769c8760d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ HdfsFile {
+ path="/tmp/hive/warehouse/test2"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="json"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
new file mode 100644
index 000000000..b18b472f6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ LocalFile {
+ path="/tmp/hive/warehouse/test2"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="json"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
index fe48fb974..fccd8db06 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
@@ -49,6 +49,16 @@ public class FakeSourceToFileIT extends SparkContainer {
Assertions.assertEquals(0, execResult.getExitCode());
}
+ /**
+ * fake source -> local json file sink
+ */
+ @Test
+ public void testFakeSourceToLocalFileJson() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+
/**
* fake source -> hdfs text file sink
*/
@@ -66,4 +76,13 @@ public class FakeSourceToFileIT extends SparkContainer {
Container.ExecResult execResult =
executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_parquet.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ /**
+ * fake source -> hdfs json file sink
+ */
+ @Test
+ public void testFakeSourceToHdfsFileJson() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_json.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
new file mode 100644
index 000000000..c4d1aabe5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ HdfsFile {
+ path="/tmp/hive/warehouse/test2"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="json"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
new file mode 100644
index 000000000..d257f81bb
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ LocalFile {
+ path="/tmp/hive/warehouse/test2"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="json"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index 9808c80e6..202f62b85 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -54,7 +54,7 @@ public class JsonDeserializationSchema implements
DeserializationSchema<SeaTunne
private final SeaTunnelRowType rowType;
/**
- * Runtime converter that converts {@link JsonNode}s into objects of Flink
SQL internal data
+ * Runtime converter that converts {@link JsonNode}s into objects of
internal data
* structures.
*/
private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java
new file mode 100644
index 000000000..3b1dce8f6
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+public class JsonFormatFactory implements DeserializationFormatFactory,
SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "json";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ // TODO config option rules
+ return OptionRule.builder().build();
+ }
+
+ @Override
+ public DeserializationFormat
createDeserializationFormat(TableFactoryContext context) {
+ Map<String, String> options = context.getOptions();
+ boolean failOnMissingField =
JsonFormatOptions.getFailOnMissingField(options);
+ boolean ignoreParseErrors =
JsonFormatOptions.getIgnoreParseErrors(options);
+
+ // TODO config SeaTunnelRowType
+ return new DeserializationFormat() {
+ @Override
+ public DeserializationSchema createDeserializationSchema() {
+ return new JsonDeserializationSchema(failOnMissingField,
ignoreParseErrors, null);
+ }
+ };
+ }
+
+ @Override
+ public SerializationFormat createSerializationFormat(TableFactoryContext
context) {
+ // TODO config SeaTunnelRowType
+ return new SerializationFormat() {
+ @Override
+ public SerializationSchema createSerializationSchema() {
+ return new JsonSerializationSchema(null);
+ }
+ };
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
new file mode 100644
index 000000000..47e8dd296
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import java.util.Map;
+
+public class JsonFormatOptions {
+
+ public static final String FAIL_ON_MISSING_FIELD = "fail_on_missing_field";
+
+ public static final String IGNORE_PARSE_ERRORS = "ignore_parse_errors";
+
+ public static boolean getFailOnMissingField(Map<String, String> options) {
+ return
Boolean.parseBoolean(options.getOrDefault(FAIL_ON_MISSING_FIELD,
Boolean.FALSE.toString()));
+ }
+
+ public static boolean getIgnoreParseErrors(Map<String, String> options) {
+ return Boolean.parseBoolean(options.getOrDefault(IGNORE_PARSE_ERRORS,
Boolean.FALSE.toString()));
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
new file mode 100644
index 000000000..3510acf00
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JsonSerializationSchema implements SerializationSchema {
+
+ /**
+ * RowType to generate the runtime converter.
+ */
+ private final SeaTunnelRowType rowType;
+
+ /** Reusable object node. */
+ private transient ObjectNode node;
+
+ /** Object mapper that is used to create output JSON objects. */
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private final RowToJsonConverters.RowToJsonConverter runtimeConverter;
+
+ public JsonSerializationSchema(SeaTunnelRowType rowType) {
+ this.rowType = rowType;
+ this.runtimeConverter = new RowToJsonConverters()
+ .createConverter(checkNotNull(rowType));
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow row) {
+ if (node == null) {
+ node = mapper.createObjectNode();
+ }
+
+ try {
+ runtimeConverter.convert(mapper, node, row);
+ return mapper.writeValueAsBytes(node);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+ String.format("Failed to deserialize JSON '%s'.", row), e);
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 5f024faa5..138b55316 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -258,7 +258,7 @@ public class JsonToRowConverters implements Serializable {
}
/**
- * Runtime converter that converts {@link JsonNode}s into objects of Flink
Table & SQL internal
+ * Runtime converter that converts {@link JsonNode}s into objects of
internal
* data structures.
*/
@FunctionalInterface
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
new file mode 100644
index 000000000..02c6ab9a7
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Map;
+
+public class RowToJsonConverters implements Serializable {
+
+ private static final long serialVersionUID = 6988876688930916940L;
+
+ public RowToJsonConverter createConverter(SeaTunnelDataType<?> type) {
+ return wrapIntoNullableConverter(createNotNullConverter(type));
+ }
+
+ private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter
converter) {
+ return (mapper, reuse, value) -> {
+ if (value == null) {
+ return mapper.getNodeFactory().nullNode();
+ }
+ return converter.convert(mapper, reuse, value);
+ };
+ }
+
+ private RowToJsonConverter createNotNullConverter(SeaTunnelDataType<?>
type) {
+ SqlType sqlType = type.getSqlType();
+ switch (sqlType) {
+ case ROW:
+ return createRowConverter((SeaTunnelRowType) type);
+ case NULL:
+ return (mapper, reuse, value) -> null;
+ case BOOLEAN:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().booleanNode((Boolean) value);
+ case TINYINT:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((byte) value);
+ case SMALLINT:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((short) value);
+ case INT:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((int) value);
+ case BIGINT:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((long) value);
+ case FLOAT:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((float) value);
+ case DOUBLE:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((double) value);
+ case DECIMAL:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().numberNode((BigDecimal) value);
+ case BYTES:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().binaryNode((byte[]) value);
+ case STRING:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().textNode((String) value);
+ case DATE:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) value));
+ case TIME:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().textNode(TimeFormat.TIME_FORMAT.format((LocalTime)
value));
+ case TIMESTAMP:
+ return (mapper, reuse, value) ->
mapper.getNodeFactory().textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime)
value));
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case MAP:
+ MapType mapType = (MapType) type;
+ return createMapConverter(mapType.toString(),
mapType.getKeyType(), mapType.getValueType());
+ default:
+ throw new UnsupportedOperationException("unsupported parse
type: " + type);
+ }
+ }
+
+ private RowToJsonConverter createRowConverter(SeaTunnelRowType rowType) {
+ final RowToJsonConverter[] fieldConverters =
+ Arrays.stream(rowType.getFieldTypes())
+ .map(this::createConverter)
+ .toArray(RowToJsonConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames();
+ final int arity = fieldNames.length;
+
+ return (mapper, reuse, value) -> {
+ ObjectNode node;
+
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createObjectNode();
+ } else {
+ node = (ObjectNode) reuse;
+ }
+
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ SeaTunnelRow row = (SeaTunnelRow) value;
+ node.set(fieldName, fieldConverters[i].convert(
+ mapper, node.get(fieldName), row.getField(i)));
+ }
+
+ return node;
+ };
+ }
+
+ private RowToJsonConverter createArrayConverter(ArrayType arrayType) {
+ final RowToJsonConverter elementConverter =
createConverter(arrayType.getElementType());
+ return (mapper, reuse, value) -> {
+ ArrayNode node;
+
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createArrayNode();
+ } else {
+ node = (ArrayNode) reuse;
+ node.removeAll();
+ }
+
+ Object[] arrayData = (Object[]) value;
+ int numElements = arrayData.length;
+ for (int i = 0; i < numElements; i++) {
+ Object element = arrayData[i];
+ node.add(elementConverter.convert(mapper, null, element));
+ }
+
+ return node;
+ };
+ }
+
+ private RowToJsonConverter createMapConverter(String typeSummary,
SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
+ if (!SqlType.STRING.equals(keyType.getSqlType())) {
+ throw new UnsupportedOperationException(
+ "JSON format doesn't support non-string as key type of
map. The type is: " + typeSummary);
+ }
+
+ final RowToJsonConverter valueConverter = createConverter(valueType);
+ return (mapper, reuse, value) -> {
+ ObjectNode node;
+
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createObjectNode();
+ } else {
+ node = (ObjectNode) reuse;
+ node.removeAll();
+ }
+
+ Map<String, ?> mapData = (Map) value;
+ for (Map.Entry<String, ?> entry : mapData.entrySet()) {
+ String fieldName = entry.getKey();
+ node.set(fieldName, valueConverter.convert(mapper,
node.get(fieldName), entry.getValue()));
+ }
+
+ return node;
+ };
+ }
+
+ public interface RowToJsonConverter extends Serializable {
+ JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value);
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/TimeFormat.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/TimeFormat.java
new file mode 100644
index 000000000..6c1a14b77
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/TimeFormat.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+
+public class TimeFormat {
+ private static final int MAX_TIME_PRECISION = 9;
+ public static final DateTimeFormatter TIME_FORMAT =
+ new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0,
MAX_TIME_PRECISION, true)
+ .toFormatter();
+}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
new file mode 100644
index 000000000..fc35e97cd
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.format.json.JsonFormatFactory
\ No newline at end of file