This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f73b37291e [Feature] [File Connector] Supports writing column names 
when the output type is file (CSV) (#5459)
f73b37291e is described below

commit f73b37291e6eedb38887f7cc0df150020b52d7c8
Author: dian <[email protected]>
AuthorDate: Mon Sep 25 16:36:09 2023 +0800

    [Feature] [File Connector] Supports writing column names when the output 
type is file (CSV) (#5459)
    
    * [Feature] [File Connector] Supports writing column names when the output 
type is file (CSV) #5443
    
    * [Feature] [File Connector] fix code style and lineSeparator #5443
    
    * [Feature] [File Connector] add enable_header_write,false:dont write 
header,true:write header. #5443
    
    * [Feature] [File Connector] fix code style #5443
    
    * [Feature] [File Connector] add enable_header_write explain #5443
    
    * [Feature] [File Connector]fix code style #5443
    
    * Update docs/en/connector-v2/sink/LocalFile.md
    
    * Update docs/en/connector-v2/sink/LocalFile.md
    
    * [Feature] [File Connector]fix code style #5443
    
    * [Feature] [File Connector]add junit test #5443
    
    * [Feature] [File Connector]add license header: #5443
    
    * [Feature] [File Connector] Supports writing column names when the output 
type is file (CSV) #5443
    
    * [Feature] [File Connector] fix code style and lineSeparator #5443
    
    * [Feature] [File Connector] add enable_header_write,false:dont write 
header,true:write header. #5443
    
    * [Feature] [File Connector] fix code style #5443
    
    * [Feature] [File Connector] add enable_header_write explain #5443
    
    * [Feature] [File Connector]fix code style #5443
    
    * Update docs/en/connector-v2/sink/LocalFile.md
    
    * Update docs/en/connector-v2/sink/LocalFile.md
    
    * [Feature] [File Connector]fix code style #5443
    
    * [Feature] [File Connector]add junit test #5443
    
    * [Feature] [File Connector]add license header: #5443
    
    * [Feature] [File Connector]add junit: #5443
    
    * [Feature] [File Connector]add junit: #5443
    
    * [Feature] [File Connector]remove scala: #5443
    
    * [Feature] [File Connector]modify md style: #5443
    
    * [Feature] [File Connector] Supports writing column names when the output 
type is file (CSV) #5443
    
    * [Feature] [File Connector] fix code style and lineSeparator #5443
    
    * [Feature] [File Connector] add enable_header_write,false:dont write 
header,true:write header. #5443
    
    * [Feature] [File Connector] fix code style #5443
    
    * [Feature] [File Connector] add enable_header_write explain #5443
    
    * [Feature] [File Connector]fix code style #5443
    
    * Update docs/en/connector-v2/sink/LocalFile.md
    
    * Update docs/en/connector-v2/sink/LocalFile.md
    
    * [Feature] [File Connector]fix code style #5443
    
    * [Feature] [File Connector]add junit test #5443
    
    * [Feature] [File Connector]add license header: #5443
    
    * [Feature] [File Connector]add junit: #5443
    
    * [Feature] [File Connector]add junit: #5443
    
    * [Feature] [File Connector]remove scala: #5443
    
    * [Feature] [File Connector]modify md style: #5443
    
    * [Feature] [File Connector]junit modify: #5443
    
    ---------
    
    Co-authored-by: zck <[email protected]>
    Co-authored-by: Eric <[email protected]>
---
 docs/en/connector-v2/sink/LocalFile.md             |  45 ++---
 .../seatunnel/file/config/BaseFileSinkConfig.java  |   5 +
 .../seatunnel/file/config/BaseSinkConfig.java      |   6 +
 .../file/sink/writer/TextWriteStrategy.java        |  19 ++
 .../apache/seatunnel/engine/e2e/TextHeaderIT.java  | 192 +++++++++++++++++++++
 .../resources/batch_fakesource_to_file_header.conf |  49 ++++++
 6 files changed, 296 insertions(+), 20 deletions(-)

diff --git a/docs/en/connector-v2/sink/LocalFile.md 
b/docs/en/connector-v2/sink/LocalFile.md
index 8e2c1526e9..90e80c6c37 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -30,26 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ## Options
 
-|               name               |  type   | required |               
default value                |                          remarks                 
         |
-|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
-| path                             | string  | yes      | -                    
                      |                                                         
  |
-| custom_filename                  | boolean | no       | false                
                      | Whether you need custom the filename                    
  |
-| file_name_expression             | string  | no       | "${transactionId}"   
                      | Only used when custom_filename is true                  
  |
-| filename_time_format             | string  | no       | "yyyy.MM.dd"         
                      | Only used when custom_filename is true                  
  |
-| file_format_type                 | string  | no       | "csv"                
                      |                                                         
  |
-| field_delimiter                  | string  | no       | '\001'               
                      | Only used when file_format_type is text                 
  |
-| row_delimiter                    | string  | no       | "\n"                 
                      | Only used when file_format_type is text                 
  |
-| have_partition                   | boolean | no       | false                
                      | Whether you need processing partitions.                 
  |
-| partition_by                     | array   | no       | -                    
                      | Only used then have_partition is true                   
  |
-| partition_dir_expression         | string  | no       | 
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is 
true                     |
-| is_partition_field_write_in_file | boolean | no       | false                
                      | Only used then have_partition is true                   
  |
-| sink_columns                     | array   | no       |                      
                      | When this parameter is empty, all fields are sink 
columns |
-| is_enable_transaction            | boolean | no       | true                 
                      |                                                         
  |
-| batch_size                       | int     | no       | 1000000              
                      |                                                         
  |
-| compress_codec                   | string  | no       | none                 
                      |                                                         
  |
-| common-options                   | object  | no       | -                    
                      |                                                         
  |
-| max_rows_in_memory               | int     | no       | -                    
                      | Only used when file_format_type is excel.               
  |
-| sheet_name                       | string  | no       | Sheet${Random 
number}                      | Only used when file_format_type is excel.        
         |
+|               name               |  type   | required |               
default value                |                                            
remarks                                            |
+|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------|
+| path                             | string  | yes      | -                    
                      |                                                         
                                      |
+| custom_filename                  | boolean | no       | false                
                      | Whether you need custom the filename                    
                                      |
+| file_name_expression             | string  | no       | "${transactionId}"   
                      | Only used when custom_filename is true                  
                                      |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"         
                      | Only used when custom_filename is true                  
                                      |
+| file_format_type                 | string  | no       | "csv"                
                      |                                                         
                                      |
+| field_delimiter                  | string  | no       | '\001'               
                      | Only used when file_format_type is text                 
                                      |
+| row_delimiter                    | string  | no       | "\n"                 
                      | Only used when file_format_type is text                 
                                      |
+| have_partition                   | boolean | no       | false                
                      | Whether you need processing partitions.                 
                                      |
+| partition_by                     | array   | no       | -                    
                      | Only used then have_partition is true                   
                                      |
+| partition_dir_expression         | string  | no       | 
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is 
true                                                         |
+| is_partition_field_write_in_file | boolean | no       | false                
                      | Only used then have_partition is true                   
                                      |
+| sink_columns                     | array   | no       |                      
                      | When this parameter is empty, all fields are sink 
columns                                     |
+| is_enable_transaction            | boolean | no       | true                 
                      |                                                         
                                      |
+| batch_size                       | int     | no       | 1000000              
                      |                                                         
                                      |
+| compress_codec                   | string  | no       | none                 
                      |                                                         
                                      |
+| common-options                   | object  | no       | -                    
                      |                                                         
                                      |
+| max_rows_in_memory               | int     | no       | -                    
                      | Only used when file_format_type is excel.               
                                      |
+| sheet_name                       | string  | no       | Sheet${Random 
number}                      | Only used when file_format_type is excel.        
                                             |
+| enable_header_write              | boolean | no       | false                
                      | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header. |
 
 ### path [string]
 
@@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items 
that can be cached in
 
 Writer the sheet of the workbook
 
+### enable_header_write [boolean]
+
+Only used when file_format_type is text,csv.false:don't write 
header,true:write header.
+
 ## Example
 
 For orc file format simple config
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 9a0ac6c678..112ab9fa1c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, 
Serializable {
     protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
     protected DateTimeUtils.Formatter datetimeFormat = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
     protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
+    protected Boolean enableHeaderWriter = false;
 
     public BaseFileSinkConfig(@NonNull Config config) {
         if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
@@ -99,6 +100,10 @@ public class BaseFileSinkConfig implements DelimiterConfig, 
Serializable {
             timeFormat =
                     
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
         }
+
+        if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
+            enableHeaderWriter = 
config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
+        }
     }
 
     public BaseFileSinkConfig() {}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 431a5d1daa..4f4d09d75c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -232,4 +232,10 @@ public class BaseSinkConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("To be written sheet name,only valid for 
excel files");
+
+    public static final Option<Boolean> ENABLE_HEADER_WRITE =
+            Options.key("enable_header_write")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("false:dont write header,true:write 
header");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index f309edb70f..b4b7bdb955 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
     private final DateUtils.Formatter dateFormat;
     private final DateTimeUtils.Formatter dateTimeFormat;
     private final TimeUtils.Formatter timeFormat;
+    private final FileFormat fileFormat;
+    private final Boolean enableHeaderWriter;
     private SerializationSchema serializationSchema;
 
     public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -58,6 +61,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
         this.dateFormat = fileSinkConfig.getDateFormat();
         this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
         this.timeFormat = fileSinkConfig.getTimeFormat();
+        this.fileFormat = fileSinkConfig.getFileFormat();
+        this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
     }
 
     @Override
@@ -133,15 +138,18 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
                         OutputStream out =
                                 
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
                         fsDataOutputStream = new FSDataOutputStream(out, null);
+                        enableWriteHeader(fsDataOutputStream);
                         break;
                     case NONE:
                         fsDataOutputStream = 
fileSystemUtils.getOutputStream(filePath);
+                        enableWriteHeader(fsDataOutputStream);
                         break;
                     default:
                         log.warn(
                                 "Text file does not support this compress 
type: {}",
                                 compressFormat.getCompressCodec());
                         fsDataOutputStream = 
fileSystemUtils.getOutputStream(filePath);
+                        enableWriteHeader(fsDataOutputStream);
                         break;
                 }
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
@@ -155,4 +163,15 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
         }
         return fsDataOutputStream;
     }
+
+    private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) 
throws IOException {
+        if (enableHeaderWriter) {
+            fsDataOutputStream.write(
+                    String.join(
+                                    FileFormat.CSV.equals(fileFormat) ? "," : 
fieldDelimiter,
+                                    seaTunnelRowType.getFieldNames())
+                            .getBytes());
+            fsDataOutputStream.write(rowDelimiter.getBytes());
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
new file mode 100644
index 0000000000..7d60fadcde
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cluster fault tolerance test. Test the job recovery capability and data 
consistency assurance
+ * capability in case of cluster node failure
+ */
+@Slf4j
+public class TextHeaderIT {
+
+    private String FILE_FORMAT_TYPE = "file_format_type";
+    private String ENABLE_HEADER_WRITE = "enable_header_write";
+
+    @Getter
+    @Setter
+    static class ContentHeader {
+        private String fileStyle;
+        private String enableWriteHeader;
+        private String headerName;
+
+        public ContentHeader(String fileStyle, String enableWriteHeader, 
String headerName) {
+            this.fileStyle = fileStyle;
+            this.enableWriteHeader = enableWriteHeader;
+            this.headerName = headerName;
+        }
+    }
+
+    @Test
+    public void testEnableWriteHeader() {
+        List<ContentHeader> lists = new ArrayList<>();
+        lists.add(
+                new ContentHeader(
+                        "text", "true", "name" + 
TextFormatConstant.SEPARATOR[0] + "age"));
+        lists.add(
+                new ContentHeader(
+                        "text", "false", "name" + 
TextFormatConstant.SEPARATOR[0] + "age"));
+        lists.add(new ContentHeader("csv", "true", "name,age"));
+        lists.add(new ContentHeader("csv", "false", "name,age"));
+        lists.forEach(
+                t -> {
+                    try {
+                        enableWriteHeader(
+                                t.getFileStyle(), t.getEnableWriteHeader(), 
t.getHeaderName());
+                    } catch (ExecutionException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    public void enableWriteHeader(String file_format_type, String headerWrite, 
String headerContent)
+            throws ExecutionException, InterruptedException {
+        String testClusterName = 
"ClusterFaultToleranceIT_EnableWriteHeaderNode";
+        HazelcastInstanceImpl node1 = null;
+        SeaTunnelClient engineClient = null;
+
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig
+                .getHazelcastConfig()
+                .setClusterName(TestUtils.getClusterName(testClusterName));
+
+        try {
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            1, 
finalNode.getCluster().getMembers().size()));
+
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                    createTestResources(headerWrite, file_format_type);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(headerWrite);
+
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                    
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+            Awaitility.await()
+                    .atMost(600000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Thread.sleep(2000);
+                                Assertions.assertTrue(
+                                        objectCompletableFuture.isDone()
+                                                && JobStatus.FINISHED.equals(
+                                                        
objectCompletableFuture.get()));
+                            });
+            File file = new File(testResources.getLeft());
+            for (File targetFile : file.listFiles()) {
+                String[] texts =
+                        FileUtils.readFileToStr(targetFile.toPath())
+                                
.split(BaseSinkConfig.ROW_DELIMITER.defaultValue());
+                if (headerWrite.equals("true")) {
+                    Assertions.assertEquals(headerContent, texts[0]);
+                } else {
+                    Assertions.assertNotEquals(headerContent, texts[0]);
+                }
+            }
+            log.info("========================clean test 
resource====================");
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+            if (node1 != null) {
+                node1.shutdown();
+            }
+        }
+    }
+
+    private ImmutablePair<String, String> createTestResources(
+            @NonNull String headerWrite, @NonNull String formatType) {
+        Map<String, String> valueMap = new HashMap<>();
+        valueMap.put(ENABLE_HEADER_WRITE, headerWrite);
+        valueMap.put(FILE_FORMAT_TYPE, formatType);
+        String targetDir = "/tmp/text";
+        targetDir = targetDir.replace("/", File.separator);
+        // clear target dir before test
+        FileUtils.createNewDir(targetDir);
+        String targetConfigFilePath =
+                File.separator
+                        + "tmp"
+                        + File.separator
+                        + "test_conf"
+                        + File.separator
+                        + headerWrite
+                        + ".conf";
+        TestUtils.createTestConfigFileFromTemplate(
+                "batch_fakesource_to_file_header.conf", valueMap, 
targetConfigFilePath);
+        return new ImmutablePair<>(targetDir, targetConfigFilePath);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf
new file mode 100644
index 0000000000..96ec46dc2e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+# Create a source to connect to Mongodb
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    result_table_name = "fake"
+    row.num = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+
+LocalFile {
+    path = "/tmp/text"
+       file_format_type="${file_format_type}"
+       enable_header_write="${enable_header_write}"
+}
+}

Reply via email to