This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 81aa5bea1a7 [opt](outfile) support compressed csv with names and types
in outfile (#58090)
81aa5bea1a7 is described below
commit 81aa5bea1a7f1cb5c3f277cbe0e6bc949b24ae0c
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Nov 20 16:43:10 2025 +0800
[opt](outfile) support compressed csv with names and types in outfile
(#58090)
### What problem does this PR solve?
Related PR: #55392
Problem Summary:
Support compression when outfile with `csv_with_names` and
`cvs_with_names_and_types`
---
be/src/vec/runtime/vcsv_transformer.cpp | 19 +-
.../data/export_p0/test_outfile_csv_compress.out | 240 +++++++++++++++++++++
.../export_p0/test_outfile_csv_compress.groovy | 139 ++++++++----
3 files changed, 357 insertions(+), 41 deletions(-)
diff --git a/be/src/vec/runtime/vcsv_transformer.cpp
b/be/src/vec/runtime/vcsv_transformer.cpp
index 0821915e578..9fdc02c4523 100644
--- a/be/src/vec/runtime/vcsv_transformer.cpp
+++ b/be/src/vec/runtime/vcsv_transformer.cpp
@@ -86,17 +86,26 @@ VCSVTransformer::VCSVTransformer(RuntimeState* state,
doris::io::FileWriter* fil
Status VCSVTransformer::open() {
RETURN_IF_ERROR(get_block_compression_codec(_compress_type,
&_compress_codec));
if (_with_bom) {
+ Slice bom_slice(reinterpret_cast<const char*>(bom), sizeof(bom));
if (_compress_codec) {
- return Status::InternalError("compressed csv with bom is not
supported yet");
+ faststring compressed_data;
+ RETURN_IF_ERROR(_compress_codec->compress(bom_slice,
&compressed_data));
+ RETURN_IF_ERROR(
+ _file_writer->append(Slice(compressed_data.data(),
compressed_data.size())));
+ } else {
+ RETURN_IF_ERROR(_file_writer->append(bom_slice));
}
- RETURN_IF_ERROR(
- _file_writer->append(Slice(reinterpret_cast<const char*>(bom),
sizeof(bom))));
}
if (!_csv_header.empty()) {
+ Slice header(_csv_header.data(), _csv_header.size());
if (_compress_codec) {
- return Status::InternalError("compressed csv with header is not
supported yet");
+ faststring compressed_data;
+ RETURN_IF_ERROR(_compress_codec->compress(header,
&compressed_data));
+ RETURN_IF_ERROR(
+ _file_writer->append(Slice(compressed_data.data(),
compressed_data.size())));
+ } else {
+ return _file_writer->append(header);
}
- return _file_writer->append(Slice(_csv_header.data(),
_csv_header.size()));
}
return Status::OK();
}
diff --git a/regression-test/data/export_p0/test_outfile_csv_compress.out
b/regression-test/data/export_p0/test_outfile_csv_compress.out
index 7d3965e8974..aa3c8fb7c25 100644
--- a/regression-test/data/export_p0/test_outfile_csv_compress.out
+++ b/regression-test/data/export_p0/test_outfile_csv_compress.out
@@ -113,6 +113,234 @@ c2 text Yes false \N NONE
c1 text Yes false \N NONE
c2 text Yes false \N NONE
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
+-- !select --
+1048576 1048576
+
+-- !select --
+id text Yes false \N NONE
+name text Yes false \N NONE
+
-- !select --
1 2
@@ -176,3 +404,15 @@ c2 text Yes false \N NONE
-- !select --
__dummy_col text Yes false \N NONE
+-- !select --
+1 zhangsan
+1 zhangsan1
+10 zhangsan10
+10 zhangsan110
+10 zhangsan1210
+10 zhangsan12410
+10 zhangsan12510
+10 zhangsan1310
+10 zhangsan13610
+10 zhangsan1410
+
diff --git a/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
index 01e5f066440..c780fd9b1b5 100644
--- a/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
+++ b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
@@ -56,11 +56,11 @@ suite("test_outfile_csv_compress", "p0") {
create_table(table_name)
def outFilePath = """s3://${bucket}/outfile_"""
- def csv_outfile_result = { the_table_name, compression_type ->
+ def csv_outfile_result = { the_table_name, compression_type, format_type ->
def result = sql """
select * from ${the_table_name}
into outfile "${outFilePath}"
- FORMAT AS CSV
+ FORMAT AS ${format_type}
PROPERTIES(
"s3.endpoint" = "${s3_endpoint}",
"s3.region" = "${region}",
@@ -73,43 +73,82 @@ suite("test_outfile_csv_compress", "p0") {
}
for (String compression_type: ["plain", "gz", "bz2", "snappyblock",
"lz4block", "zstd"]) {
- def outfile_url = csv_outfile_result(table_name, compression_type);
- print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 +
bucket.length(), outfile_url.length() - 1)}0.")
- qt_select """ select c1, c2 from s3(
- "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
- "ACCESS_KEY"= "${ak}",
- "SECRET_KEY" = "${sk}",
- "format" = "csv",
- "provider" = "${getS3Provider()}",
- "region" = "${region}",
- "compress_type" = "${compression_type}"
- ) order by c1, c2 limit 10;
- """
- qt_select """ select count(c1), count(c2) from s3(
- "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
- "ACCESS_KEY"= "${ak}",
- "SECRET_KEY" = "${sk}",
- "format" = "csv",
- "provider" = "${getS3Provider()}",
- "region" = "${region}",
- "compress_type" = "${compression_type}"
- );
- """
- qt_select """desc function s3(
- "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
- "ACCESS_KEY"= "${ak}",
- "SECRET_KEY" = "${sk}",
- "format" = "csv",
- "provider" = "${getS3Provider()}",
- "region" = "${region}",
- "compress_type" = "${compression_type}"
- );
- """
+ for (String format_type : ["csv"]) {
+ def outfile_url = csv_outfile_result(table_name, compression_type,
format_type);
+ print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 +
bucket.length(), outfile_url.length() - 1)}0.")
+ qt_select """ select c1, c2 from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format_type}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ ) order by c1, c2 limit 10;
+ """
+ qt_select """ select count(c1), count(c2) from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format_type}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ qt_select """desc function s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format_type}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ }
+ }
+
+ for (String compression_type: ["plain", "gz", "bz2", "snappyblock",
"lz4block", "zstd"]) {
+ for (String format_type : ["csv_with_names",
"csv_with_names_and_types"]) {
+ def outfile_url = csv_outfile_result(table_name, compression_type,
format_type);
+ print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 +
bucket.length(), outfile_url.length() - 1)}0.")
+ qt_select """ select id, name from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format_type}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ ) order by id, name limit 10;
+ """
+ qt_select """ select count(id), count(name) from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format_type}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ qt_select """desc function s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format_type}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ }
}
for (String compression_type: ["plain", "gz", "bz2", "snappyblock",
"lz4block", "zstd"]) {
def small = "small_${table_name}"
- def outfile_url = csv_outfile_result(small, compression_type);
+ def outfile_url = csv_outfile_result(small, compression_type, "csv");
print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 +
bucket.length(), outfile_url.length() - 1)}0.")
qt_select """ select c1, c2 from s3(
"uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
@@ -163,7 +202,7 @@ suite("test_outfile_csv_compress", "p0") {
// test empty table
sql """drop table if exists test_outfile_csv_compress_empty_table"""
sql """create table test_outfile_csv_compress_empty_table(k1 int)
distributed by hash(k1) buckets 1 properties("replication_num" = "1")"""
- def empty_outfile_url =
csv_outfile_result("test_outfile_csv_compress_empty_table", "gz");
+ def empty_outfile_url =
csv_outfile_result("test_outfile_csv_compress_empty_table", "gz", "csv");
qt_select """desc function s3(
"uri" =
"http://${bucket}.${s3_endpoint}${empty_outfile_url.substring(5 +
bucket.length(), empty_outfile_url.length() - 1)}*",
"ACCESS_KEY"= "${ak}",
@@ -174,5 +213,33 @@ suite("test_outfile_csv_compress", "p0") {
"compress_type" = "gz"
);
"""
+
+ // test bom compress
+ def result = sql """
+ select * from ${table_name}
+ into outfile "${outFilePath}"
+ FORMAT AS CSV
+ PROPERTIES(
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "compress_type" = "gz",
+ "with_bom" = "true"
+ );
+ """
+ def outfile_url = result[0][3]
+ print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 +
bucket.length(), outfile_url.length() - 1)}0.")
+ qt_select """ select c1, c2 from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "with_bom" = "true",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "gz"
+ ) order by c1, c2 limit 10;
+ """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]