This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f73b37291e [Feature] [File Connector] Supports writing column names
when the output type is file (CSV) (#5459)
f73b37291e is described below
commit f73b37291e6eedb38887f7cc0df150020b52d7c8
Author: dian <[email protected]>
AuthorDate: Mon Sep 25 16:36:09 2023 +0800
[Feature] [File Connector] Supports writing column names when the output
type is file (CSV) (#5459)
* [Feature] [File Connector] Supports writing column names when the output
type is file (CSV) #5443
* [Feature] [File Connector] fix code style and lineSeparator #5443
* [Feature] [File Connector] add enable_header_write,false:dont write
header,true:write header. #5443
* [Feature] [File Connector] fix code style #5443
* [Feature] [File Connector] add enable_header_write explain #5443
* [Feature] [File Connector]fix code style #5443
* Update docs/en/connector-v2/sink/LocalFile.md
* Update docs/en/connector-v2/sink/LocalFile.md
* [Feature] [File Connector]fix code style #5443
* [Feature] [File Connector]add junit test #5443
* [Feature] [File Connector]add license header: #5443
* [Feature] [File Connector] Supports writing column names when the output
type is file (CSV) #5443
* [Feature] [File Connector] fix code style and lineSeparator #5443
* [Feature] [File Connector] add enable_header_write,false:dont write
header,true:write header. #5443
* [Feature] [File Connector] fix code style #5443
* [Feature] [File Connector] add enable_header_write explain #5443
* [Feature] [File Connector]fix code style #5443
* Update docs/en/connector-v2/sink/LocalFile.md
* Update docs/en/connector-v2/sink/LocalFile.md
* [Feature] [File Connector]fix code style #5443
* [Feature] [File Connector]add junit test #5443
* [Feature] [File Connector]add license header: #5443
* [Feature] [File Connector]add junit: #5443
* [Feature] [File Connector]add junit: #5443
* [Feature] [File Connector]remove scala: #5443
* [Feature] [File Connector]modify md style: #5443
* [Feature] [File Connector] Supports writing column names when the output
type is file (CSV) #5443
* [Feature] [File Connector] fix code style and lineSeparator #5443
* [Feature] [File Connector] add enable_header_write,false:dont write
header,true:write header. #5443
* [Feature] [File Connector] fix code style #5443
* [Feature] [File Connector] add enable_header_write explain #5443
* [Feature] [File Connector]fix code style #5443
* Update docs/en/connector-v2/sink/LocalFile.md
* Update docs/en/connector-v2/sink/LocalFile.md
* [Feature] [File Connector]fix code style #5443
* [Feature] [File Connector]add junit test #5443
* [Feature] [File Connector]add license header: #5443
* [Feature] [File Connector]add junit: #5443
* [Feature] [File Connector]add junit: #5443
* [Feature] [File Connector]remove scala: #5443
* [Feature] [File Connector]modify md style: #5443
* [Feature] [File Connector]junit modify: #5443
---------
Co-authored-by: zck <[email protected]>
Co-authored-by: Eric <[email protected]>
---
docs/en/connector-v2/sink/LocalFile.md | 45 ++---
.../seatunnel/file/config/BaseFileSinkConfig.java | 5 +
.../seatunnel/file/config/BaseSinkConfig.java | 6 +
.../file/sink/writer/TextWriteStrategy.java | 19 ++
.../apache/seatunnel/engine/e2e/TextHeaderIT.java | 192 +++++++++++++++++++++
.../resources/batch_fakesource_to_file_header.conf | 49 ++++++
6 files changed, 296 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index 8e2c1526e9..90e80c6c37 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -30,26 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once`
## Options
-| name | type | required |
default value | remarks
|
-|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
-| path | string | yes | -
|
|
-| custom_filename | boolean | no | false
| Whether you need custom the filename
|
-| file_name_expression | string | no | "${transactionId}"
| Only used when custom_filename is true
|
-| filename_time_format | string | no | "yyyy.MM.dd"
| Only used when custom_filename is true
|
-| file_format_type | string | no | "csv"
|
|
-| field_delimiter | string | no | '\001'
| Only used when file_format_type is text
|
-| row_delimiter | string | no | "\n"
| Only used when file_format_type is text
|
-| have_partition | boolean | no | false
| Whether you need processing partitions.
|
-| partition_by | array | no | -
| Only used then have_partition is true
|
-| partition_dir_expression | string | no |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is
true |
-| is_partition_field_write_in_file | boolean | no | false
| Only used then have_partition is true
|
-| sink_columns | array | no |
| When this parameter is empty, all fields are sink
columns |
-| is_enable_transaction | boolean | no | true
|
|
-| batch_size | int | no | 1000000
|
|
-| compress_codec | string | no | none
|
|
-| common-options | object | no | -
|
|
-| max_rows_in_memory | int | no | -
| Only used when file_format_type is excel.
|
-| sheet_name | string | no | Sheet${Random
number} | Only used when file_format_type is excel.
|
+| name | type | required |
default value |
remarks |
+|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------|
+| path | string | yes | -
|
|
+| custom_filename | boolean | no | false
| Whether you need custom the filename
|
+| file_name_expression | string | no | "${transactionId}"
| Only used when custom_filename is true
|
+| filename_time_format | string | no | "yyyy.MM.dd"
| Only used when custom_filename is true
|
+| file_format_type | string | no | "csv"
|
|
+| field_delimiter | string | no | '\001'
| Only used when file_format_type is text
|
+| row_delimiter | string | no | "\n"
| Only used when file_format_type is text
|
+| have_partition | boolean | no | false
| Whether you need processing partitions.
|
+| partition_by | array | no | -
| Only used then have_partition is true
|
+| partition_dir_expression | string | no |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is
true |
+| is_partition_field_write_in_file | boolean | no | false
| Only used then have_partition is true
|
+| sink_columns | array | no |
| When this parameter is empty, all fields are sink
columns |
+| is_enable_transaction | boolean | no | true
|
|
+| batch_size | int | no | 1000000
|
|
+| compress_codec | string | no | none
|
|
+| common-options | object | no | -
|
|
+| max_rows_in_memory | int | no | -
| Only used when file_format_type is excel.
|
+| sheet_name | string | no | Sheet${Random
number} | Only used when file_format_type is excel.
|
+| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header. |
### path [string]
@@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items
that can be cached in
Writer the sheet of the workbook
+### enable_header_write [boolean]
+
+Only used when file_format_type is text,csv.false:don't write
header,true:write header.
+
## Example
For orc file format simple config
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 9a0ac6c678..112ab9fa1c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig,
Serializable {
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
+ protected Boolean enableHeaderWriter = false;
public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
@@ -99,6 +100,10 @@ public class BaseFileSinkConfig implements DelimiterConfig,
Serializable {
timeFormat =
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
}
+
+ if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
+ enableHeaderWriter =
config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
+ }
}
public BaseFileSinkConfig() {}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 431a5d1daa..4f4d09d75c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -232,4 +232,10 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("To be written sheet name,only valid for
excel files");
+
+ public static final Option<Boolean> ENABLE_HEADER_WRITE =
+ Options.key("enable_header_write")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("false:dont write header,true:write
header");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index f309edb70f..b4b7bdb955 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateUtils.Formatter dateFormat;
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
+ private final FileFormat fileFormat;
+ private final Boolean enableHeaderWriter;
private SerializationSchema serializationSchema;
public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -58,6 +61,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
this.dateFormat = fileSinkConfig.getDateFormat();
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
this.timeFormat = fileSinkConfig.getTimeFormat();
+ this.fileFormat = fileSinkConfig.getFileFormat();
+ this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
}
@Override
@@ -133,15 +138,18 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
OutputStream out =
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
fsDataOutputStream = new FSDataOutputStream(out, null);
+ enableWriteHeader(fsDataOutputStream);
break;
case NONE:
fsDataOutputStream =
fileSystemUtils.getOutputStream(filePath);
+ enableWriteHeader(fsDataOutputStream);
break;
default:
log.warn(
"Text file does not support this compress
type: {}",
compressFormat.getCompressCodec());
fsDataOutputStream =
fileSystemUtils.getOutputStream(filePath);
+ enableWriteHeader(fsDataOutputStream);
break;
}
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
@@ -155,4 +163,15 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
}
return fsDataOutputStream;
}
+
+ private void enableWriteHeader(FSDataOutputStream fsDataOutputStream)
throws IOException {
+ if (enableHeaderWriter) {
+ fsDataOutputStream.write(
+ String.join(
+ FileFormat.CSV.equals(fileFormat) ? "," :
fieldDelimiter,
+ seaTunnelRowType.getFieldNames())
+ .getBytes());
+ fsDataOutputStream.write(rowDelimiter.getBytes());
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
new file mode 100644
index 0000000000..7d60fadcde
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cluster fault tolerance test. Test the job recovery capability and data
consistency assurance
+ * capability in case of cluster node failure
+ */
+@Slf4j
+public class TextHeaderIT {
+
+ private String FILE_FORMAT_TYPE = "file_format_type";
+ private String ENABLE_HEADER_WRITE = "enable_header_write";
+
+ @Getter
+ @Setter
+ static class ContentHeader {
+ private String fileStyle;
+ private String enableWriteHeader;
+ private String headerName;
+
+ public ContentHeader(String fileStyle, String enableWriteHeader,
String headerName) {
+ this.fileStyle = fileStyle;
+ this.enableWriteHeader = enableWriteHeader;
+ this.headerName = headerName;
+ }
+ }
+
+ @Test
+ public void testEnableWriteHeader() {
+ List<ContentHeader> lists = new ArrayList<>();
+ lists.add(
+ new ContentHeader(
+ "text", "true", "name" +
TextFormatConstant.SEPARATOR[0] + "age"));
+ lists.add(
+ new ContentHeader(
+ "text", "false", "name" +
TextFormatConstant.SEPARATOR[0] + "age"));
+ lists.add(new ContentHeader("csv", "true", "name,age"));
+ lists.add(new ContentHeader("csv", "false", "name,age"));
+ lists.forEach(
+ t -> {
+ try {
+ enableWriteHeader(
+ t.getFileStyle(), t.getEnableWriteHeader(),
t.getHeaderName());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void enableWriteHeader(String file_format_type, String headerWrite,
String headerContent)
+ throws ExecutionException, InterruptedException {
+ String testClusterName =
"ClusterFaultToleranceIT_EnableWriteHeaderNode";
+ HazelcastInstanceImpl node1 = null;
+ SeaTunnelClient engineClient = null;
+
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig
+ .getHazelcastConfig()
+ .setClusterName(TestUtils.getClusterName(testClusterName));
+
+ try {
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 1,
finalNode.getCluster().getMembers().size()));
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(headerWrite, file_format_type);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(headerWrite);
+
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ Awaitility.await()
+ .atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Thread.sleep(2000);
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ && JobStatus.FINISHED.equals(
+
objectCompletableFuture.get()));
+ });
+ File file = new File(testResources.getLeft());
+ for (File targetFile : file.listFiles()) {
+ String[] texts =
+ FileUtils.readFileToStr(targetFile.toPath())
+
.split(BaseSinkConfig.ROW_DELIMITER.defaultValue());
+ if (headerWrite.equals("true")) {
+ Assertions.assertEquals(headerContent, texts[0]);
+ } else {
+ Assertions.assertNotEquals(headerContent, texts[0]);
+ }
+ }
+ log.info("========================clean test
resource====================");
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ }
+ }
+
+ private ImmutablePair<String, String> createTestResources(
+ @NonNull String headerWrite, @NonNull String formatType) {
+ Map<String, String> valueMap = new HashMap<>();
+ valueMap.put(ENABLE_HEADER_WRITE, headerWrite);
+ valueMap.put(FILE_FORMAT_TYPE, formatType);
+ String targetDir = "/tmp/text";
+ targetDir = targetDir.replace("/", File.separator);
+ // clear target dir before test
+ FileUtils.createNewDir(targetDir);
+ String targetConfigFilePath =
+ File.separator
+ + "tmp"
+ + File.separator
+ + "test_conf"
+ + File.separator
+ + headerWrite
+ + ".conf";
+ TestUtils.createTestConfigFileFromTemplate(
+ "batch_fakesource_to_file_header.conf", valueMap,
targetConfigFilePath);
+ return new ImmutablePair<>(targetDir, targetConfigFilePath);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf
new file mode 100644
index 0000000000..96ec46dc2e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+# Create a source to connect to Mongodb
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ result_table_name = "fake"
+ row.num = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+
+LocalFile {
+ path = "/tmp/text"
+ file_format_type="${file_format_type}"
+ enable_header_write="${enable_header_write}"
+}
+}