This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b41fcda6eae branch-3.1: [feat](outfile) support compression type for 
csv format in outfile and export #55392 (#55561)
b41fcda6eae is described below

commit b41fcda6eaecf9a7b3d0a4fe239f4a3d748f7bdd
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 4 10:36:45 2025 +0800

    branch-3.1: [feat](outfile) support compression type for csv format in 
outfile and export #55392 (#55561)
    
    Cherry-picked from #55392
    
    Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
 be/src/pipeline/exec/result_sink_operator.h        |   6 +
 be/src/vec/sink/writer/vfile_result_writer.cpp     |  27 ++++-
 be/src/vec/sink/writer/vfile_result_writer.h       |   2 +
 .../org/apache/doris/analysis/OutFileClause.java   |   5 +
 .../java/org/apache/doris/common/util/Util.java    |   4 +-
 .../fileformat/CsvFileFormatProperties.java        |  32 +++++
 .../main/java/org/apache/doris/load/ExportJob.java |  11 +-
 .../fileformat/CsvFileFormatPropertiesTest.java    |   8 +-
 .../fileformat/TextFileFormatPropertiesTest.java   |   5 +-
 gensrc/thrift/DataSinks.thrift                     |   3 +
 regression-test/data/export_p0/test_export_csv.out | Bin 21332 -> 24555 bytes
 .../data/export_p0/test_outfile_csv_compress.out   | Bin 0 -> 1729 bytes
 .../suites/export_p0/test_export_csv.groovy        |  69 +++++++++++
 .../export_p0/test_outfile_csv_compress.groovy     | 131 +++++++++++++++++++++
 14 files changed, 285 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 06544f9bc18..1c2b669e02f 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -54,6 +54,9 @@ struct ResultFileOptions {
     bool is_refactor_before_flag = false;
     std::string orc_schema;
     TFileCompressType::type orc_compression_type;
+    // currently only for csv
+    // TODO: we should merge 
parquet_commpression_type/orc_compression_type/compression_type
+    TFileCompressType::type compression_type = TFileCompressType::PLAIN;
 
     bool delete_existing_files = false;
     std::string file_suffix;
@@ -116,6 +119,9 @@ struct ResultFileOptions {
         if (t_opt.__isset.orc_writer_version) {
             orc_writer_version = t_opt.orc_writer_version;
         }
+        if (t_opt.__isset.compression_type) {
+            compression_type = t_opt.compression_type;
+        }
     }
 };
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 555b44ea3df..ad29714cc55 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -126,10 +126,10 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
             }));
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
-        _vfile_writer.reset(new VCSVTransformer(_state, 
_file_writer_impl.get(),
-                                                _vec_output_expr_ctxs, 
_output_object_data,
-                                                _header_type, _header, 
_file_opts->column_separator,
-                                                _file_opts->line_delimiter, 
_file_opts->with_bom));
+        _vfile_writer.reset(new VCSVTransformer(
+                _state, _file_writer_impl.get(), _vec_output_expr_ctxs, 
_output_object_data,
+                _header_type, _header, _file_opts->column_separator, 
_file_opts->line_delimiter,
+                _file_opts->with_bom, _file_opts->compression_type));
         break;
     case TFileFormatType::FORMAT_PARQUET:
         _vfile_writer.reset(new VParquetTransformer(
@@ -195,7 +195,7 @@ void VFileResultWriter::_get_file_url(std::string* 
file_url) {
 std::string VFileResultWriter::_file_format_to_name() {
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
-        return "csv";
+        return "csv" + _compression_type_to_name();
     case TFileFormatType::FORMAT_PARQUET:
         return "parquet";
     case TFileFormatType::FORMAT_ORC:
@@ -205,6 +205,23 @@ std::string VFileResultWriter::_file_format_to_name() {
     }
 }
 
+std::string VFileResultWriter::_compression_type_to_name() {
+    switch (_file_opts->compression_type) {
+    case TFileCompressType::GZ:
+        return ".gzip";
+    case TFileCompressType::BZ2:
+        return ".bzip2";
+    case TFileCompressType::SNAPPYBLOCK:
+        return ".snappy";
+    case TFileCompressType::LZ4BLOCK:
+        return ".lz4";
+    case TFileCompressType::ZSTD:
+        return ".zstd";
+    default:
+        return "";
+    }
+}
+
 Status VFileResultWriter::write(RuntimeState* state, Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 8b611d7ceef..209e1af1d69 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -101,7 +101,9 @@ private:
     // delete the dir of file_path
     Status _delete_dir();
     double _get_write_speed(int64_t write_bytes, int64_t write_time);
+    std::string _compression_type_to_name();
 
+private:
     RuntimeState* _state; // not owned, set when init
     const pipeline::ResultFileOptions* _file_opts = nullptr;
     TStorageBackendType::type _storage_type;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 8211f84a21c..2edc3b4660f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -504,6 +504,11 @@ public class OutFileClause {
         analyzeBrokerDesc(copiedProps);
 
         fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
+        // check if compression type for csv is supported
+        if (fileFormatProperties instanceof CsvFileFormatProperties) {
+            CsvFileFormatProperties csvFileFormatProperties = 
(CsvFileFormatProperties) fileFormatProperties;
+            csvFileFormatProperties.checkSupportedCompressionType(true);
+        }
 
         if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) {
             maxFileSizeBytes = 
ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 296f76d1f79..ab14b529441 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -610,7 +610,7 @@ public class Util {
         }
     }
 
-    public static TFileCompressType getFileCompressType(String compressType) {
+    public static TFileCompressType getFileCompressType(String compressType) 
throws AnalysisException {
         if (Strings.isNullOrEmpty(compressType)) {
             return TFileCompressType.UNKNOWN;
         }
@@ -618,7 +618,7 @@ public class Util {
         try {
             return TFileCompressType.valueOf(upperCaseType);
         } catch (IllegalArgumentException e) {
-            return TFileCompressType.UNKNOWN;
+            throw new AnalysisException("Unknown compression type: " + 
compressType);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
index 25bd0c469db..2ed59a818f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -22,20 +22,35 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileTextScanRangeParams;
 import org.apache.doris.thrift.TResultFileSinkOptions;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Map;
+import java.util.Set;
 
 public class CsvFileFormatProperties extends FileFormatProperties {
     public static final Logger LOG = LogManager.getLogger(
             
org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class);
 
+    // supported compression types for csv writer
+    public static final Set<TFileCompressType> 
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES = Sets.newHashSet();
+
+    static {
+        SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.PLAIN);
+        SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.GZ);
+        SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.BZ2);
+        
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.SNAPPYBLOCK);
+        SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.LZ4BLOCK);
+        SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.ZSTD);
+    }
+
     public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     public static final String DEFAULT_LINE_DELIMITER = "\n";
 
@@ -119,6 +134,7 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
                 throw new AnalysisException("skipLines should not be less than 
0.");
             }
 
+            // This default value is "UNKNOWN", so that the caller may infer 
the compression type by suffix of file.
             String compressTypeStr = getOrDefault(formatProperties,
                     PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty);
             compressionType = Util.getFileCompressType(compressTypeStr);
@@ -140,10 +156,26 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
         }
     }
 
+    public void checkSupportedCompressionType(boolean isWrite) {
+        // Currently, only check for write operation.
+        // Because we only support a subset of compression type for writing.
+        if (isWrite) {
+            // "UNKNOWN" means user does not specify the compression type
+            if (this.compressionType == TFileCompressType.UNKNOWN) {
+                this.compressionType = TFileCompressType.PLAIN;
+            }
+            if 
(!SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.contains(this.compressionType)) {
+                throw new AnalysisException(
+                        "csv compression type [" + this.compressionType.name() 
+ "] is invalid for writing");
+            }
+        }
+    }
+
     @Override
     public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
         sinkOptions.setColumnSeparator(columnSeparator);
         sinkOptions.setLineDelimiter(lineDelimiter);
+        sinkOptions.setCompressionType(compressionType);
     }
 
     // The method `analyzeFileFormatProperties` must have been called once 
before this method
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 434deaf440b..be77d591ab8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -578,13 +578,12 @@ public class ExportJob implements Writable {
         if (format.equals("csv") || format.equals("csv_with_names") || 
format.equals("csv_with_names_and_types")) {
             outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, 
columnSeparator);
             outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, 
lineDelimiter);
-        } else {
-            // orc / parquet
-            // compressType == null means outfile will use default compression 
type
-            if (compressType != null) {
-                outfileProperties.put(ExportCommand.COMPRESS_TYPE, 
compressType);
-            }
         }
+        // compressType == null means outfile will use default compression type
+        if (compressType != null) {
+            outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType);
+        }
+
         if (!maxFileSize.isEmpty()) {
             outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, 
maxFileSize);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
index 4b2550cfa52..1482c84055d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.property.fileformat;
 
+import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.thrift.TFileCompressType;
 
@@ -130,17 +131,18 @@ public class CsvFileFormatPropertiesTest {
     public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
         Map<String, String> properties = new HashMap<>();
         properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
-        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
-        Assert.assertEquals(TFileCompressType.UNKNOWN, 
csvFileFormatProperties.getCompressionType());
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+                "Unknown compression type: invalid",
+                () -> 
csvFileFormatProperties.analyzeFileFormatProperties(properties, true));
     }
 
     @Test
     public void testAnalyzeFileFormatPropertiesValidCompressType() throws 
AnalysisException {
         Map<String, String> properties = new HashMap<>();
         properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "gz");
-
         csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
         Assert.assertEquals(TFileCompressType.GZ, 
csvFileFormatProperties.getCompressionType());
+        ExceptionChecker.expectThrowsNoException(() -> 
csvFileFormatProperties.checkSupportedCompressionType(true));
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
index b8f694b40bc..2ed04c234b7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.property.fileformat;
 
+import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.thrift.TFileCompressType;
 
@@ -93,8 +94,8 @@ public class TextFileFormatPropertiesTest {
     public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
         Map<String, String> properties = new HashMap<>();
         properties.put(TextFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
-        textFileFormatProperties.analyzeFileFormatProperties(properties, true);
-        Assert.assertEquals(TFileCompressType.UNKNOWN, 
textFileFormatProperties.getCompressionType());
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Unknown 
compression type: invalid",
+                () -> 
textFileFormatProperties.analyzeFileFormatProperties(properties, true));
     }
 
     @Test
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 73f91c84311..77862d1c5e4 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -145,6 +145,9 @@ struct TResultFileSinkOptions {
     //hive write sink use int96
     //export data to file use by user define properties
     21: optional bool enable_int96_timestamps
+    // currently only for csv
+    // TODO: merge with parquet_compression_type and orc_compression_type
+    22: optional PlanNodes.TFileCompressType compression_type
 }
 
 struct TMemoryScratchSink {
diff --git a/regression-test/data/export_p0/test_export_csv.out 
b/regression-test/data/export_p0/test_export_csv.out
index 9952ded9179..e1d8251f420 100644
Binary files a/regression-test/data/export_p0/test_export_csv.out and 
b/regression-test/data/export_p0/test_export_csv.out differ
diff --git a/regression-test/data/export_p0/test_outfile_csv_compress.out 
b/regression-test/data/export_p0/test_outfile_csv_compress.out
new file mode 100644
index 00000000000..48ae4946778
Binary files /dev/null and 
b/regression-test/data/export_p0/test_outfile_csv_compress.out differ
diff --git a/regression-test/suites/export_p0/test_export_csv.groovy 
b/regression-test/suites/export_p0/test_export_csv.groovy
index 09d06996b7f..9091fa55e6b 100644
--- a/regression-test/suites/export_p0/test_export_csv.groovy
+++ b/regression-test/suites/export_p0/test_export_csv.groovy
@@ -403,5 +403,74 @@ suite("test_export_csv", "p0") {
         delete_files.call("${outFilePath}")
     }
 
+    // 5. test csv with compression
+    uuid = UUID.randomUUID().toString()
+    outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
+    label = "label_${uuid}"
+    try {
+        // check export path
+        check_path_exists.call("${outFilePath}")
+
+        // exec export
+        sql """
+            EXPORT TABLE ${table_export_name} where user_id < 11 TO 
"file://${outFilePath}/"
+            PROPERTIES(
+                "label" = "${label}",
+                "format" = "csv",
+                "compress_type"="gz"
+            );
+        """
+        waiting_export.call(label)
+        
+        // check data correctness
+        sql """ DROP TABLE IF EXISTS ${table_load_name} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${table_load_name} (
+            `user_id` LARGEINT NOT NULL COMMENT "用户id",
+            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
+            `city` VARCHAR(20) COMMENT "用户所在城市",
+            `age` SMALLINT COMMENT "用户年龄",
+            `sex` TINYINT COMMENT "用户性别",
+            `bool_col` boolean COMMENT "",
+            `int_col` int COMMENT "",
+            `bigint_col` bigint COMMENT "",
+            `largeint_col` largeint COMMENT "",
+            `float_col` float COMMENT "",
+            `double_col` double COMMENT "",
+            `char_col` CHAR(10) COMMENT "",
+            `decimal_col` decimal COMMENT "",
+            `ipv4_col` ipv4 COMMENT "",
+            `ipv6_col` ipv6 COMMENT ""
+            )
+            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+        """
+
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+           logger.info("Begin to insert into ${table_load_name} from local()")
+           sql """
+                insert into ${table_load_name}
+                select *
+                from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "compress_type" = "gz");
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
+        }
+
+        qt_select_load5 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
+    
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+        delete_files.call("${outFilePath}")
+    }
+
     try_sql("DROP TABLE IF EXISTS ${table_export_name}")
 }
diff --git a/regression-test/suites/export_p0/test_outfile_csv_compress.groovy 
b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
new file mode 100644
index 00000000000..6bdbb39fe75
--- /dev/null
+++ b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_csv_compress", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def create_table = {table_name -> 
+        sql """ DROP TABLE IF EXISTS ${table_name} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                `id` int,
+                `name` varchar(128) NOT NULL COMMENT ""
+                )
+            DISTRIBUTED BY HASH(name) PROPERTIES("replication_num" = "1");
+        """
+        sql """ INSERT INTO ${table_name} values(1, 'zhangsan');"""
+        for (int i = 0; i < 20; i++) {
+            sql """ insert into ${table_name} select id + ${i}, concat(name, 
id + ${i}) from ${table_name};"""
+        }
+    }
+
+    def table_name = "test_outfile_csv_compress"
+    create_table(table_name)
+
+    def outFilePath = """s3://${bucket}/outfile_"""
+    def csv_outfile_result = { the_table_name, compression_type ->
+        def result = sql """
+                select * from ${the_table_name}
+                into outfile "${outFilePath}"
+                FORMAT AS CSV
+                PROPERTIES(
+                    "s3.endpoint" = "${s3_endpoint}",
+                    "s3.region" = "${region}",
+                    "s3.secret_key"="${sk}",
+                    "s3.access_key" = "${ak}",
+                    "compress_type" = "${compression_type}"
+                );
+            """
+        return result[0][3]
+    }
+
+    for (String compression_type: ["plain", "gz", "bz2", "snappyblock", 
"lz4block", "zstd"]) {
+        def outfile_url = csv_outfile_result(table_name, compression_type);
+        print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + 
bucket.length(), outfile_url.length() - 1)}0.")
+        qt_select """ select c1, c2 from s3(
+                    "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}*",
+                    "ACCESS_KEY"= "${ak}",
+                    "SECRET_KEY" = "${sk}",
+                    "format" = "csv",
+                    "provider" = "${getS3Provider()}",
+                    "region" = "${region}",
+                    "compress_type" = "${compression_type}"
+                ) order by c1, c2 limit 10;
+                """
+        qt_select """ select count(c1), count(c2) from s3(
+                    "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}*",
+                    "ACCESS_KEY"= "${ak}",
+                    "SECRET_KEY" = "${sk}",
+                    "format" = "csv",
+                    "provider" = "${getS3Provider()}",
+                    "region" = "${region}",
+                    "compress_type" = "${compression_type}"
+                );
+                """
+        qt_select """desc function s3(
+                    "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}*",
+                    "ACCESS_KEY"= "${ak}",
+                    "SECRET_KEY" = "${sk}",
+                    "format" = "csv",
+                    "provider" = "${getS3Provider()}",
+                    "region" = "${region}",
+                    "compress_type" = "${compression_type}"
+                );
+                """
+    }
+
+    // test invalid compression_type
+    test {
+        sql """
+                select * from ${table_name}
+                into outfile "${outFilePath}"
+                FORMAT AS CSV
+                PROPERTIES(
+                    "s3.endpoint" = "${s3_endpoint}",
+                    "s3.region" = "${region}",
+                    "s3.secret_key"="${sk}",
+                    "s3.access_key" = "${ak}",
+                    "compress_type" = "invalid"
+                );
+            """
+        exception """Unknown compression type"""
+    }
+
+    // test empty table
+    sql """drop table if exists test_outfile_csv_compress_empty_table"""
+    sql """create table test_outfile_csv_compress_empty_table(k1 int) 
distributed by hash(k1) buckets 1 properties("replication_num" = "1")"""
+    def empty_outfile_url = 
csv_outfile_result("test_outfile_csv_compress_empty_table", "gz");
+    qt_select """desc function s3(
+                "uri" = 
"http://${bucket}.${s3_endpoint}${empty_outfile_url.substring(5 + 
bucket.length(), empty_outfile_url.length() - 1)}*",
+                "ACCESS_KEY"= "${ak}",
+                "SECRET_KEY" = "${sk}",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "region" = "${region}",
+                "compress_type" = "gz"
+            );
+            """
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to