This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd68c06b0 [Feature][Connector-V2] Add json file sink & json format 
(#2385)
dd68c06b0 is described below

commit dd68c06b0accf1570c845dae833fee3c331a2b8a
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 10 09:54:04 2022 +0800

    [Feature][Connector-V2] Add json file sink & json format (#2385)
    
    * [Feature][Connector-V2] Add [File]JSON sink & json format
    
    Co-authored-by: wanghailin <[email protected]>
---
 docs/en/connector-v2/sink/HdfsFile.md              |   2 +-
 docs/en/connector-v2/sink/LocalFile.md             |   2 +-
 .../connector-file/connector-file-base/pom.xml     |   5 +
 .../seatunnel/file/config/FileFormat.java          |   3 +-
 .../hdfs/HdfsJsonTransactionStateFileWriter.java   | 121 +++++++++++++
 .../hdfs/HdfsTransactionStateFileWriteFactory.java |  13 ++
 .../local/LocalJsonTransactionStateFileWriter.java | 122 +++++++++++++
 .../LocalTransactionStateFileWriteFactory.java     |  13 ++
 .../e2e/flink/v2/file/FakeSourceToFileIT.java      |  18 ++
 .../resources/file/fakesource_to_hdfs_json.conf    |  65 +++++++
 .../resources/file/fakesource_to_local_json.conf   |  65 +++++++
 .../e2e/spark/v2/file/FakeSourceToFileIT.java      |  19 +++
 .../resources/file/fakesource_to_hdfs_json.conf    |  64 +++++++
 .../resources/file/fakesource_to_local_json.conf   |  64 +++++++
 .../format/json/JsonDeserializationSchema.java     |   2 +-
 .../seatunnel/format/json/JsonFormatFactory.java   |  72 ++++++++
 .../seatunnel/format/json/JsonFormatOptions.java   |  36 ++++
 .../format/json/JsonSerializationSchema.java       |  65 +++++++
 .../seatunnel/format/json/JsonToRowConverters.java |   2 +-
 .../seatunnel/format/json/RowToJsonConverters.java | 188 +++++++++++++++++++++
 .../apache/seatunnel/format/json/TimeFormat.java   |  32 ++++
 .../org.apache.seatunnel.api.table.factory.Factory |  18 ++
 22 files changed, 986 insertions(+), 5 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md 
b/docs/en/connector-v2/sink/HdfsFile.md
index 2513d59c1..cbc2d616a 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -40,7 +40,7 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 We supported as the following file types:
 
-`text` `csv` `parquet` `orc`
+`text` `csv` `parquet` `orc` `json`
 
 Please note that, The final file name will ends with the file_format's suffix, 
the suffix of the text file is `txt`.
 
diff --git a/docs/en/connector-v2/sink/LocalFile.md 
b/docs/en/connector-v2/sink/LocalFile.md
index f7e820cc8..773c625af 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -38,7 +38,7 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 We supported as the following file types:
 
-`text` `csv` `parquet` `orc`
+`text` `csv` `parquet` `orc` `json`
 
 Please note that, The final file name will ends with the file_format's suffix, 
the suffix of the text file is `txt`.
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml 
b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index b046c2278..05e98573e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -35,6 +35,11 @@
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index c02f58168..f9ca34409 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -23,7 +23,8 @@ public enum FileFormat implements Serializable {
     CSV("csv"),
     TEXT("txt"),
     PARQUET("parquet"),
-    ORC("orc");
+    ORC("orc"),
+    JSON("json");
 
     private String suffix;
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsJsonTransactionStateFileWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsJsonTransactionStateFileWriter.java
new file mode 100644
index 000000000..ec21990ee
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsJsonTransactionStateFileWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class HdfsJsonTransactionStateFileWriter extends 
AbstractTransactionStateFileWriter {
+
+    private static final long serialVersionUID = -5432828969702531646L;
+
+    private final byte[] rowDelimiter;
+    private final SerializationSchema serializationSchema;
+    private Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+    public HdfsJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo,
+                                              @NonNull 
TransactionFileNameGenerator transactionFileNameGenerator,
+                                              @NonNull 
PartitionDirNameGenerator partitionDirNameGenerator,
+                                              @NonNull List<Integer> 
sinkColumnsIndexInRow,
+                                              @NonNull String tmpPath,
+                                              @NonNull String targetPath,
+                                              @NonNull String jobId,
+                                              int subTaskIndex,
+                                              @NonNull String rowDelimiter,
+                                              @NonNull FileSystem fileSystem) {
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, 
partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, 
subTaskIndex, fileSystem);
+
+        this.rowDelimiter = rowDelimiter.getBytes();
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowTypeInfo);
+        beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
+        try {
+            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+            fsDataOutputStream.write(rowBytes);
+            fsDataOutputStream.write(rowDelimiter);
+        } catch (IOException e) {
+            log.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        beingWrittenOutputStream.entrySet().forEach(entry -> {
+            try {
+                entry.getValue().flush();
+            } catch (IOException e) {
+                log.error("error when flush file {}", entry.getKey());
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    entry.getValue().close();
+                } catch (IOException e) {
+                    log.error("error when close output stream {}", 
entry.getKey());
+                }
+            }
+
+            needMoveFiles.put(entry.getKey(), 
getTargetLocation(entry.getKey()));
+        });
+    }
+
+    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+        FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
+        if (fsDataOutputStream == null) {
+            try {
+                fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
+                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+            } catch (IOException e) {
+                log.error("can not get output file stream");
+                throw new RuntimeException(e);
+            }
+        }
+        return fsDataOutputStream;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
index f4c5cd840..1761b799d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
@@ -85,6 +85,19 @@ public class HdfsTransactionStateFileWriteFactory {
                     subTaskIndex,
                     fileSystem);
         }
+        if (fileFormat.equals(FileFormat.JSON)) {
+            return new HdfsJsonTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    rowDelimiter,
+                    fileSystem);
+        }
         // if file type not supported by file connector, default txt writer 
will be generated
         return new HdfsTxtTransactionStateFileWriter(
                     seaTunnelRowTypeInfo,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalJsonTransactionStateFileWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalJsonTransactionStateFileWriter.java
new file mode 100644
index 000000000..e087bfff9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalJsonTransactionStateFileWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class LocalJsonTransactionStateFileWriter extends 
AbstractTransactionStateFileWriter {
+
+    private static final long serialVersionUID = -3834472539886339383L;
+
+    private final byte[] rowDelimiter;
+    private final SerializationSchema serializationSchema;
+    private Map<String, FileOutputStream> beingWrittenOutputStream;
+
+    public LocalJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo,
+                                               @NonNull 
TransactionFileNameGenerator transactionFileNameGenerator,
+                                               @NonNull 
PartitionDirNameGenerator partitionDirNameGenerator,
+                                               @NonNull List<Integer> 
sinkColumnsIndexInRow,
+                                               @NonNull String tmpPath,
+                                               @NonNull String targetPath,
+                                               @NonNull String jobId,
+                                               int subTaskIndex,
+                                               @NonNull String rowDelimiter,
+                                               @NonNull FileSystem fileSystem) 
{
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, 
partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, 
subTaskIndex, fileSystem);
+
+        this.rowDelimiter = rowDelimiter.getBytes();
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowTypeInfo);
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        FileOutputStream fileOutputStream = getOrCreateOutputStream(filePath);
+        try {
+            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+            fileOutputStream.write(rowBytes);
+            fileOutputStream.write(rowDelimiter);
+        } catch (IOException e) {
+            log.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        beingWrittenOutputStream.entrySet().forEach(entry -> {
+            try {
+                entry.getValue().flush();
+            } catch (IOException e) {
+                log.error("error when flush file {}", entry.getKey());
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    entry.getValue().close();
+                } catch (IOException e) {
+                    log.error("error when close output stream {}", 
entry.getKey());
+                }
+            }
+
+            needMoveFiles.put(entry.getKey(), 
getTargetLocation(entry.getKey()));
+        });
+    }
+
+    private FileOutputStream getOrCreateOutputStream(@NonNull String filePath) 
{
+        FileOutputStream fileOutputStream = 
beingWrittenOutputStream.get(filePath);
+        if (fileOutputStream == null) {
+            try {
+                FileUtils.createFile(filePath);
+                fileOutputStream = new FileOutputStream(filePath);
+                beingWrittenOutputStream.put(filePath, fileOutputStream);
+            } catch (IOException e) {
+                log.error("can not get output file stream");
+                throw new RuntimeException(e);
+            }
+        }
+        return fileOutputStream;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
index 05e92d5fd..2efab19b7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
@@ -85,6 +85,19 @@ public class LocalTransactionStateFileWriteFactory {
                     subTaskIndex,
                     fileSystem);
         }
+        if (fileFormat.equals(FileFormat.JSON)) {
+            return new LocalJsonTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    rowDelimiter,
+                    fileSystem);
+        }
         // if file type not supported by file connector, default txt writer 
will be generated
         return new LocalTxtTransactionStateFileWriter(
                     seaTunnelRowTypeInfo,
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
index 9ac33002e..50d7c34d2 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
@@ -45,6 +45,15 @@ public class FakeSourceToFileIT extends FlinkContainer {
         Assertions.assertEquals(0, execResult.getExitCode());
     }
 
+    /**
+     *  fake source -> local json file sink
+     */
+    @Test
+    public void testFakeSourceToLocalFileJson() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/file/fakesource_to_local_json.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
     /**
      * fake source -> hdfs text file sink
      */
@@ -62,4 +71,13 @@ public class FakeSourceToFileIT extends FlinkContainer {
         Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_parquet.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    /**
+     * fake source -> hdfs json file sink
+     */
+    @Test
+    public void testFakeSourceToHdfsFileJson() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_json.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
new file mode 100644
index 000000000..769c8760d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+}
+
+transform {
+  sql {
+    sql = "select name,age from fake"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  HdfsFile {
+    path="/tmp/hive/warehouse/test2"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="json"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
new file mode 100644
index 000000000..b18b472f6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  sql {
+    sql = "select name,age from fake"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  LocalFile {
+    path="/tmp/hive/warehouse/test2"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="json"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
index fe48fb974..fccd8db06 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
@@ -49,6 +49,16 @@ public class FakeSourceToFileIT extends SparkContainer {
         Assertions.assertEquals(0, execResult.getExitCode());
     }
 
+    /**
+     *  fake source -> local json file sink
+     */
+    @Test
+    public void testFakeSourceToLocalFileJson() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+
     /**
      * fake source -> hdfs text file sink
      */
@@ -66,4 +76,13 @@ public class FakeSourceToFileIT extends SparkContainer {
         Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_parquet.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    /**
+     * fake source -> hdfs json file sink
+     */
+    @Test
+    public void testFakeSourceToHdfsFileJson() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_json.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
new file mode 100644
index 000000000..c4d1aabe5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+}
+
+transform {
+  sql {
+    sql = "select name,age from fake"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  HdfsFile {
+    path="/tmp/hive/warehouse/test2"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="json"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
new file mode 100644
index 000000000..d257f81bb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  sql {
+    sql = "select name,age from fake"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  LocalFile {
+    path="/tmp/hive/warehouse/test2"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="json"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index 9808c80e6..202f62b85 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -54,7 +54,7 @@ public class JsonDeserializationSchema implements 
DeserializationSchema<SeaTunne
     private final SeaTunnelRowType rowType;
 
     /**
-     * Runtime converter that converts {@link JsonNode}s into objects of Flink 
SQL internal data
+     * Runtime converter that converts {@link JsonNode}s into objects of 
internal data
      * structures.
      */
     private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java
new file mode 100644
index 000000000..3b1dce8f6
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+public class JsonFormatFactory implements DeserializationFormatFactory, 
SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "json";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // TODO config option rules
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public DeserializationFormat 
createDeserializationFormat(TableFactoryContext context) {
+        Map<String, String> options = context.getOptions();
+        boolean failOnMissingField = 
JsonFormatOptions.getFailOnMissingField(options);
+        boolean ignoreParseErrors = 
JsonFormatOptions.getIgnoreParseErrors(options);
+
+        // TODO config SeaTunnelRowType
+        return new DeserializationFormat() {
+            @Override
+            public DeserializationSchema createDeserializationSchema() {
+                return new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);
+            }
+        };
+    }
+
+    @Override
+    public SerializationFormat createSerializationFormat(TableFactoryContext 
context) {
+        // TODO config SeaTunnelRowType
+        return new SerializationFormat() {
+            @Override
+            public SerializationSchema createSerializationSchema() {
+                return new JsonSerializationSchema(null);
+            }
+        };
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
new file mode 100644
index 000000000..47e8dd296
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import java.util.Map;
+
+public class JsonFormatOptions {
+
+    public static final String FAIL_ON_MISSING_FIELD = "fail_on_missing_field";
+
+    public static final String IGNORE_PARSE_ERRORS = "ignore_parse_errors";
+
+    public static boolean getFailOnMissingField(Map<String, String> options) {
+        return 
Boolean.parseBoolean(options.getOrDefault(FAIL_ON_MISSING_FIELD, 
Boolean.FALSE.toString()));
+    }
+
+    public static boolean getIgnoreParseErrors(Map<String, String> options) {
+        return Boolean.parseBoolean(options.getOrDefault(IGNORE_PARSE_ERRORS, 
Boolean.FALSE.toString()));
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
new file mode 100644
index 000000000..3510acf00
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JsonSerializationSchema implements SerializationSchema {
+
+    /**
+     * RowType to generate the runtime converter.
+     */
+    private final SeaTunnelRowType rowType;
+
+    /** Reusable object node. */
+    private transient ObjectNode node;
+
+    /** Object mapper that is used to create output JSON objects. */
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private final RowToJsonConverters.RowToJsonConverter runtimeConverter;
+
+    public JsonSerializationSchema(SeaTunnelRowType rowType) {
+        this.rowType = rowType;
+        this.runtimeConverter = new RowToJsonConverters()
+                .createConverter(checkNotNull(rowType));
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        if (node == null) {
+            node = mapper.createObjectNode();
+        }
+
+        try {
+            runtimeConverter.convert(mapper, node, row);
+            return mapper.writeValueAsBytes(node);
+        } catch (Throwable e) {
+            throw new RuntimeException(
+                    String.format("Failed to deserialize JSON '%s'.", row), e);
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 5f024faa5..138b55316 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -258,7 +258,7 @@ public class JsonToRowConverters implements Serializable {
     }
 
     /**
-     * Runtime converter that converts {@link JsonNode}s into objects of Flink 
Table & SQL internal
+     * Runtime converter that converts {@link JsonNode}s into objects of 
internal
      * data structures.
      */
     @FunctionalInterface
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
new file mode 100644
index 000000000..02c6ab9a7
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Map;
+
+public class RowToJsonConverters implements Serializable {
+
+    private static final long serialVersionUID = 6988876688930916940L;
+
+    public RowToJsonConverter createConverter(SeaTunnelDataType<?> type) {
+        return wrapIntoNullableConverter(createNotNullConverter(type));
+    }
+
+    private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter 
converter) {
+        return (mapper, reuse, value) -> {
+            if (value == null) {
+                return mapper.getNodeFactory().nullNode();
+            }
+            return converter.convert(mapper, reuse, value);
+        };
+    }
+
+    private RowToJsonConverter createNotNullConverter(SeaTunnelDataType<?> 
type) {
+        SqlType sqlType = type.getSqlType();
+        switch (sqlType) {
+            case ROW:
+                return createRowConverter((SeaTunnelRowType) type);
+            case NULL:
+                return (mapper, reuse, value) -> null;
+            case BOOLEAN:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().booleanNode((Boolean) value);
+            case TINYINT:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((byte) value);
+            case SMALLINT:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((short) value);
+            case INT:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((int) value);
+            case BIGINT:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((long) value);
+            case FLOAT:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((float) value);
+            case DOUBLE:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((double) value);
+            case DECIMAL:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().numberNode((BigDecimal) value);
+            case BYTES:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().binaryNode((byte[]) value);
+            case STRING:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().textNode((String) value);
+            case DATE:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) value));
+            case TIME:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().textNode(TimeFormat.TIME_FORMAT.format((LocalTime) 
value));
+            case TIMESTAMP:
+                return (mapper, reuse, value) -> 
mapper.getNodeFactory().textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) 
value));
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                MapType mapType = (MapType) type;
+                return createMapConverter(mapType.toString(), 
mapType.getKeyType(), mapType.getValueType());
+            default:
+                throw new UnsupportedOperationException("unsupported parse 
type: " + type);
+        }
+    }
+
+    private RowToJsonConverter createRowConverter(SeaTunnelRowType rowType) {
+        final RowToJsonConverter[] fieldConverters =
+                Arrays.stream(rowType.getFieldTypes())
+                        .map(this::createConverter)
+                        .toArray(RowToJsonConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames();
+        final int arity = fieldNames.length;
+
+        return (mapper, reuse, value) -> {
+            ObjectNode node;
+
+            // reuse could be a NullNode if last record is null.
+            if (reuse == null || reuse.isNull()) {
+                node = mapper.createObjectNode();
+            } else {
+                node = (ObjectNode) reuse;
+            }
+
+            for (int i = 0; i < arity; i++) {
+                String fieldName = fieldNames[i];
+                SeaTunnelRow row = (SeaTunnelRow) value;
+                node.set(fieldName, fieldConverters[i].convert(
+                        mapper, node.get(fieldName), row.getField(i)));
+            }
+
+            return node;
+        };
+    }
+
+    private RowToJsonConverter createArrayConverter(ArrayType arrayType) {
+        final RowToJsonConverter elementConverter = 
createConverter(arrayType.getElementType());
+        return (mapper, reuse, value) -> {
+            ArrayNode node;
+
+            // reuse could be a NullNode if last record is null.
+            if (reuse == null || reuse.isNull()) {
+                node = mapper.createArrayNode();
+            } else {
+                node = (ArrayNode) reuse;
+                node.removeAll();
+            }
+
+            Object[] arrayData = (Object[]) value;
+            int numElements = arrayData.length;
+            for (int i = 0; i < numElements; i++) {
+                Object element = arrayData[i];
+                node.add(elementConverter.convert(mapper, null, element));
+            }
+
+            return node;
+        };
+    }
+
+    private RowToJsonConverter createMapConverter(String typeSummary, 
SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
+        if (!SqlType.STRING.equals(keyType.getSqlType())) {
+            throw new UnsupportedOperationException(
+                    "JSON format doesn't support non-string as key type of 
map. The type is: " + typeSummary);
+        }
+
+        final RowToJsonConverter valueConverter = createConverter(valueType);
+        return (mapper, reuse, value) -> {
+            ObjectNode node;
+
+            // reuse could be a NullNode if last record is null.
+            if (reuse == null || reuse.isNull()) {
+                node = mapper.createObjectNode();
+            } else {
+                node = (ObjectNode) reuse;
+                node.removeAll();
+            }
+
+            Map<String, ?> mapData = (Map) value;
+            for (Map.Entry<String, ?> entry : mapData.entrySet()) {
+                String fieldName = entry.getKey();
+                node.set(fieldName, valueConverter.convert(mapper, 
node.get(fieldName), entry.getValue()));
+            }
+
+            return node;
+        };
+    }
+
+    public interface RowToJsonConverter extends Serializable {
+        JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value);
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/TimeFormat.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/TimeFormat.java
new file mode 100644
index 000000000..6c1a14b77
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/TimeFormat.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+
+public class TimeFormat {
+    private static final int MAX_TIME_PRECISION = 9;
+    public static final DateTimeFormatter TIME_FORMAT =
+            new DateTimeFormatterBuilder()
+                    .appendPattern("HH:mm:ss")
+                    .appendFraction(ChronoField.NANO_OF_SECOND, 0, 
MAX_TIME_PRECISION, true)
+                    .toFormatter();
+}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
 
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
new file mode 100644
index 000000000..fc35e97cd
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.format.json.JsonFormatFactory
\ No newline at end of file

Reply via email to