This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8875d0258 [Improve][Connector-V2][File] File Connector add lzo
compression way. (#3782)
8875d0258 is described below
commit 8875d02589685cbd57742291cc6d71f6e4a70ace
Author: lightzhao <[email protected]>
AuthorDate: Fri Dec 30 11:00:25 2022 +0800
[Improve][Connector-V2][File] File Connector add lzo compression way.
(#3782)
* add lzo compression way.
* format code style
* add CompressFormat enum.
* Judge with switch statement.
* fix CompressFormat enum valueOf adapt
* Repair NPE when compression is not set.
* add license header.
* fix CI problem.
* CompressFormat enum add default value NONE.
* Supplement lzo compressed documents.
* Supplement lzo compressed documents.
* modify file name ends with *.lzo.txt
Co-authored-by: zhaoliang01 <[email protected]>
---
docs/en/connector-v2/sink/HdfsFile.md | 11 +++++-
docs/en/connector-v2/sink/Hive.md | 4 +-
.../seatunnel/file/config/BaseFileSinkConfig.java | 11 +++++-
.../seatunnel/file/config/CompressFormat.java | 45 ++++++++++++++++++++++
.../file/sink/writer/AbstractWriteStrategy.java | 9 ++++-
.../file/sink/writer/TextWriteStrategy.java | 22 ++++++++++-
6 files changed, 94 insertions(+), 8 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index 09e91c2dc..c17c59683 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -19,6 +19,9 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] parquet
- [x] orc
- [x] json
+- [x] compress codec
+ - [x] lzo
+
## Options
@@ -41,6 +44,7 @@ In order to use this connector, You must ensure your
spark/flink cluster already
| is_enable_transaction | boolean | no | true
|
| batch_size | int | no | 1000000
|
| common-options | | no | -
|
+| compressCodec | string | no | none
|
### fs.defaultFS [string]
@@ -125,8 +129,10 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
-### common options
+### compressCodec [string]
+Support lzo compression for text in file format. The file name ends with
".lzo.txt" .
+### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
## Example
@@ -207,4 +213,5 @@ HdfsFile {
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
-- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+- [Improve] Support lzo compression for text in file format
([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 01cc43d71..607a20a0a 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -21,6 +21,8 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] text
- [x] parquet
- [x] orc
+- [x] compress codec
+ - [x] lzo
## Options
@@ -28,8 +30,8 @@ By default, we use 2PC commit to ensure `exactly-once`
|----------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
+| compressCodec | string | no | none |
| common-options | | no | - |
-
### table_name [string]
Target Hive table name eg: db1.table1
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 2befb3ae6..42b31adc9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -50,8 +50,15 @@ public class BaseFileSinkConfig implements DelimiterConfig,
CompressConfig, Seri
public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
- throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
- "Compress not supported by SeaTunnel file connector now");
+ CompressFormat compressFormat =
CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT));
+ switch (compressFormat) {
+ case LZO:
+ this.compressCodec = compressFormat.getCompressCodec();
+ break;
+ default:
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Compress not supported this compress code by
SeaTunnel file connector now");
+ }
}
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
new file mode 100644
index 000000000..6449f1845
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum CompressFormat implements Serializable {
+
+ LZO("lzo"),
+ NONE("none");
+
+ private final String compressCodec;
+
+ CompressFormat(String compressCodec) {
+ this.compressCodec = compressCodec;
+ }
+
+ public String getCompressCodec() {
+ return compressCodec;
+ }
+
+ public static CompressFormat getCompressFormat(String value) {
+ switch (value) {
+ case "lzo":
+ return CompressFormat.LZO;
+ default:
+ return CompressFormat.NONE;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 264af8af3..50c1f365e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.VariablesSubstitute;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -197,8 +198,12 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
public String generateFileName(String transactionId) {
String fileNameExpression = fileSinkConfig.getFileNameExpression();
FileFormat fileFormat = fileSinkConfig.getFileFormat();
+ String suffix = fileFormat.getSuffix();
+ if
(CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec()))
{
+ suffix = "." + CompressFormat.LZO.getCompressCodec() + "." +
suffix;
+ }
if (StringUtils.isBlank(fileNameExpression)) {
- return transactionId + fileFormat.getSuffix();
+ return transactionId + suffix;
}
String timeFormat = fileSinkConfig.getFileNameTimeFormat();
DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
@@ -209,7 +214,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
valuesMap.put(timeFormat, formattedDate);
valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId);
String substitute = VariablesSubstitute.substitute(fileNameExpression,
valuesMap) + "_" + partId;
- return substitute + fileFormat.getSuffix();
+ return substitute + suffix;
}
/**
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index ebd491b08..b0ba66746 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -24,15 +24,19 @@ import
org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
+import io.airlift.compress.lzo.LzopCodec;
import lombok.NonNull;
import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
public class TextWriteStrategy extends AbstractWriteStrategy {
@@ -44,6 +48,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
private SerializationSchema serializationSchema;
+ private String compressCodec;
public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
@@ -54,6 +59,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
this.dateFormat = textFileSinkConfig.getDateFormat();
this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
this.timeFormat = textFileSinkConfig.getTimeFormat();
+ this.compressCodec = textFileSinkConfig.getCompressCodec();
}
@Override
@@ -111,7 +117,21 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
- fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ if (compressCodec != null) {
+ CompressFormat compressFormat =
CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT));
+ switch (compressFormat) {
+ case LZO:
+ LzopCodec lzo = new LzopCodec();
+ OutputStream out =
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
+ fsDataOutputStream = new FSDataOutputStream(out,
null);
+ break;
+ default:
+ fsDataOutputStream =
fileSystemUtils.getOutputStream(filePath);
+ }
+ } else {
+ fsDataOutputStream =
fileSystemUtils.getOutputStream(filePath);
+ }
+
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {