This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8875d0258 [Improve][Connector-V2][File] File Connector add lzo 
compression way. (#3782)
8875d0258 is described below

commit 8875d02589685cbd57742291cc6d71f6e4a70ace
Author: lightzhao <[email protected]>
AuthorDate: Fri Dec 30 11:00:25 2022 +0800

    [Improve][Connector-V2][File] File Connector add lzo compression way. 
(#3782)
    
    * add lzo compression way.
    
    * format code style
    
    * add CompressFormat enum.
    
    * Judge with switch statement.
    
    * fix CompressFormat enum valueOf adapt
    
    * Repair NPE when compression is not set.
    
    * add license header.
    
    * fix CI problem.
    
    * CompressFormat enum add default value NONE.
    
    * Supplement lzo compressed documents.
    
    * Supplement lzo compressed documents.
    
    * modify file name ends with *.lzo.txt
    
    Co-authored-by: zhaoliang01 <[email protected]>
---
 docs/en/connector-v2/sink/HdfsFile.md              | 11 +++++-
 docs/en/connector-v2/sink/Hive.md                  |  4 +-
 .../seatunnel/file/config/BaseFileSinkConfig.java  | 11 +++++-
 .../seatunnel/file/config/CompressFormat.java      | 45 ++++++++++++++++++++++
 .../file/sink/writer/AbstractWriteStrategy.java    |  9 ++++-
 .../file/sink/writer/TextWriteStrategy.java        | 22 ++++++++++-
 6 files changed, 94 insertions(+), 8 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md 
b/docs/en/connector-v2/sink/HdfsFile.md
index 09e91c2dc..c17c59683 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -19,6 +19,9 @@ By default, we use 2PC commit to ensure `exactly-once`
   - [x] parquet
   - [x] orc
   - [x] json
+- [x] compress codec
+  - [x] lzo
+
 
 ## Options
 
@@ -41,6 +44,7 @@ In order to use this connector, You must ensure your 
spark/flink cluster already
 | is_enable_transaction            | boolean | no       | true                 
                                     |
 | batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
+| compressCodec                    | string  | no       | none                 
                                     |
 
 ### fs.defaultFS [string]
 
@@ -125,8 +129,10 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
 
-### common options
+### compressCodec [string]
+Support lzo compression for text in file format. The file name ends with 
".lzo.txt" .
 
+### common options
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
 
 ## Example
@@ -207,4 +213,5 @@ HdfsFile {
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
 
-- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+- [Improve] Support lzo compression for text in file format 
([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index 01cc43d71..607a20a0a 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -21,6 +21,8 @@ By default, we use 2PC commit to ensure `exactly-once`
   - [x] text
   - [x] parquet
   - [x] orc
+- [x] compress codec
+  - [x] lzo
 
 ## Options
 
@@ -28,8 +30,8 @@ By default, we use 2PC commit to ensure `exactly-once`
 |----------------|--------|----------|---------------|
 | table_name     | string | yes      | -             |
 | metastore_uri  | string | yes      | -             |
+| compressCodec  | string | no       | none          |
 | common-options |        | no       | -             |
-
 ### table_name [string]
 
 Target Hive table name eg: db1.table1
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 2befb3ae6..42b31adc9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -50,8 +50,15 @@ public class BaseFileSinkConfig implements DelimiterConfig, 
CompressConfig, Seri
 
     public BaseFileSinkConfig(@NonNull Config config) {
         if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
-            throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
-                    "Compress not supported by SeaTunnel file connector now");
+            CompressFormat compressFormat = 
CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT));
+            switch (compressFormat) {
+                case LZO:
+                    this.compressCodec = compressFormat.getCompressCodec();
+                    break;
+                default:
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "Compress not supported this compress code by 
SeaTunnel file connector now");
+            }
         }
         if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
             this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
new file mode 100644
index 000000000..6449f1845
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum CompressFormat implements Serializable {
+
+    LZO("lzo"),
+    NONE("none");
+
+    private final String compressCodec;
+
+    CompressFormat(String compressCodec) {
+        this.compressCodec = compressCodec;
+    }
+
+    public String getCompressCodec() {
+        return compressCodec;
+    }
+
+    public static CompressFormat getCompressFormat(String value) {
+        switch (value) {
+            case "lzo":
+                return CompressFormat.LZO;
+            default:
+                return CompressFormat.NONE;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 264af8af3..50c1f365e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.VariablesSubstitute;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -197,8 +198,12 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     public String generateFileName(String transactionId) {
         String fileNameExpression = fileSinkConfig.getFileNameExpression();
         FileFormat fileFormat = fileSinkConfig.getFileFormat();
+        String suffix = fileFormat.getSuffix();
+        if 
(CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec()))
 {
+            suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + 
suffix;
+        }
         if (StringUtils.isBlank(fileNameExpression)) {
-            return transactionId + fileFormat.getSuffix();
+            return transactionId + suffix;
         }
         String timeFormat = fileSinkConfig.getFileNameTimeFormat();
         DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
@@ -209,7 +214,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         valuesMap.put(timeFormat, formattedDate);
         valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId);
         String substitute = VariablesSubstitute.substitute(fileNameExpression, 
valuesMap) + "_" + partId;
-        return substitute + fileFormat.getSuffix();
+        return substitute + suffix;
     }
 
     /**
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index ebd491b08..b0ba66746 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -24,15 +24,19 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
+import io.airlift.compress.lzo.LzopCodec;
 import lombok.NonNull;
 import org.apache.hadoop.fs.FSDataOutputStream;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 
 public class TextWriteStrategy extends AbstractWriteStrategy {
@@ -44,6 +48,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
     private final DateTimeUtils.Formatter dateTimeFormat;
     private final TimeUtils.Formatter timeFormat;
     private SerializationSchema serializationSchema;
+    private String compressCodec;
 
     public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
         super(textFileSinkConfig);
@@ -54,6 +59,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
         this.dateFormat = textFileSinkConfig.getDateFormat();
         this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
         this.timeFormat = textFileSinkConfig.getTimeFormat();
+        this.compressCodec = textFileSinkConfig.getCompressCodec();
     }
 
     @Override
@@ -111,7 +117,21 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
         FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
-                fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                if (compressCodec != null) {
+                    CompressFormat compressFormat = 
CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT));
+                    switch (compressFormat) {
+                        case LZO:
+                            LzopCodec lzo = new LzopCodec();
+                            OutputStream out = 
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
+                            fsDataOutputStream = new FSDataOutputStream(out, 
null);
+                            break;
+                        default:
+                            fsDataOutputStream = 
fileSystemUtils.getOutputStream(filePath);
+                    }
+                } else {
+                    fsDataOutputStream = 
fileSystemUtils.getOutputStream(filePath);
+                }
+
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {

Reply via email to