This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 85ea37d4a2d [fix](iceberg)Support LZ4 compression for iceberg/hive
Parquet/ORC writers. (#64723)
85ea37d4a2d is described below
commit 85ea37d4a2dbfd305ea453bcb91c4ba424b924d5
Author: daidai <[email protected]>
AuthorDate: Mon Jun 29 10:35:51 2026 +0800
[fix](iceberg)Support LZ4 compression for iceberg/hive Parquet/ORC writers.
(#64723)
---
.../writer/iceberg/viceberg_delete_file_writer.cpp | 4 +
.../writer/iceberg/viceberg_partition_writer.cpp | 10 ++
be/src/exec/sink/writer/vhive_partition_writer.cpp | 5 +
be/src/format/transformer/vorc_transformer.cpp | 7 +
be/src/format/transformer/vparquet_transformer.cpp | 11 ++
.../org/apache/doris/datasource/hive/HiveUtil.java | 6 +-
gensrc/thrift/DataSinks.thrift | 5 +
.../external_table_p0/hive/ddl/test_hive_ddl.out | 9 ++
.../test_iceberg_write_parquet_compression.out | 44 ++++++
.../hive/ddl/test_hive_ddl.groovy | 22 ++-
.../test_iceberg_write_parquet_compression.groovy | 155 +++++++++++++++++++++
11 files changed, 275 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
index 968ac987e9a..0225d3a8457 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
@@ -93,6 +93,10 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state,
RuntimeProfile* profi
case TFileCompressType::ZSTD:
parquet_compression_type = TParquetCompressionType::ZSTD;
break;
+ case TFileCompressType::LZ4BLOCK:
+ // Hadoop-framed Parquet LZ4 (not LZ4_RAW) for cross-engine
compatibility.
+ parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
+ break;
default:
return Status::InternalError("Unsupported compress type {} with
parquet",
to_string(_compress_type));
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 8766c945f5c..434488266bb 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -80,6 +80,12 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
parquet_compression_type = TParquetCompressionType::ZSTD;
break;
}
+ case TFileCompressType::LZ4BLOCK: {
+ // Map Doris LZ4 to the Hadoop-framed Parquet LZ4 codec (not
LZ4_RAW) so the file
+ // stays readable across Spark/Iceberg/Trino. See
ParquetBuildHelper.
+ parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
+ break;
+ }
default: {
return Status::InternalError("Unsupported compress type {} with
parquet",
to_string(_compress_type));
@@ -179,6 +185,10 @@ std::string VIcebergPartitionWriter::_get_file_extension(
compress_name = ".zstd";
break;
}
+ case TFileCompressType::LZ4BLOCK: {
+ compress_name = ".lz4";
+ break;
+ }
default: {
compress_name = "";
break;
diff --git a/be/src/exec/sink/writer/vhive_partition_writer.cpp
b/be/src/exec/sink/writer/vhive_partition_writer.cpp
index 686732b4712..4bfc728c8c9 100644
--- a/be/src/exec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/exec/sink/writer/vhive_partition_writer.cpp
@@ -83,6 +83,11 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* operator_
parquet_compression_type = TParquetCompressionType::ZSTD;
break;
}
+ case TFileCompressType::LZ4BLOCK: {
+ // Hadoop-framed Parquet LZ4 (not LZ4_RAW) for cross-engine
compatibility.
+ parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
+ break;
+ }
default: {
return Status::InternalError("Unsupported hive compress type {}
with parquet",
to_string(_hive_compress_type));
diff --git a/be/src/format/transformer/vorc_transformer.cpp
b/be/src/format/transformer/vorc_transformer.cpp
index eb216828583..7dfa9fb4c64 100644
--- a/be/src/format/transformer/vorc_transformer.cpp
+++ b/be/src/format/transformer/vorc_transformer.cpp
@@ -183,6 +183,13 @@ void VOrcTransformer::set_compression_type(const
TFileCompressType::type& compre
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD);
break;
}
+ case TFileCompressType::LZ4BLOCK: {
+ // ORC has a single, unambiguous LZ4 codec (raw LZ4 inside ORC's own
compression
+ // framing), interoperable with Spark/Trino. Honor the requested codec
instead of
+ // silently falling back to ZLIB below.
+
_write_options->setCompression(orc::CompressionKind::CompressionKind_LZ4);
+ break;
+ }
default: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
}
diff --git a/be/src/format/transformer/vparquet_transformer.cpp
b/be/src/format/transformer/vparquet_transformer.cpp
index 8c90713ab29..b073dcf3f9f 100644
--- a/be/src/format/transformer/vparquet_transformer.cpp
+++ b/be/src/format/transformer/vparquet_transformer.cpp
@@ -123,6 +123,17 @@ void ParquetBuildHelper::build_compression_type(
builder.compression(arrow::Compression::LZ4);
break;
}
+ case TParquetCompressionType::LZ4_HADOOP: {
+ constexpr int64_t HADOOP_LZ4_DEFAULT_BUFFER_SIZE = 256 * 1024;
+ // Hadoop-framed LZ4 -> Parquet thrift codec "LZ4" (deprecated). This
matches what
+ // Spark/Iceberg writes for `write.parquet.compression-codec=lz4`.
Arrow 17 emits one
+ // Hadoop LZ4 block per Parquet page/dictionary page, while Hadoop JVM
readers default to
+ // a 256 KiB LZ4 codec buffer, so keep page targets below that buffer
size.
+ builder.compression(arrow::Compression::LZ4_HADOOP);
+ builder.data_pagesize(HADOOP_LZ4_DEFAULT_BUFFER_SIZE / 2);
+ builder.dictionary_pagesize_limit(HADOOP_LZ4_DEFAULT_BUFFER_SIZE / 2);
+ break;
+ }
// arrow do not support lzo and bz2 compression type.
// case TParquetCompressionType::LZO: {
// builder.compression(arrow::Compression::LZO);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index 260e7e0f12f..45e26a084f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -64,8 +64,10 @@ import java.util.stream.Collectors;
public final class HiveUtil {
public static final String COMPRESSION_KEY = "compression";
- public static final Set<String> SUPPORTED_ORC_COMPRESSIONS =
ImmutableSet.of("plain", "zlib", "snappy", "zstd");
- public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS =
ImmutableSet.of("plain", "snappy", "zstd");
+ public static final Set<String> SUPPORTED_ORC_COMPRESSIONS =
+ ImmutableSet.of("plain", "zlib", "snappy", "zstd", "lz4");
+ public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS =
+ ImmutableSet.of("plain", "snappy", "zstd", "lz4");
public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 3212313183b..1a4a7377d79 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -63,6 +63,11 @@ enum TParquetCompressionType {
LZO = 5,
BZ2 = 6,
UNCOMPRESSED = 7,
+ // Hadoop-framed (deprecated Parquet "LZ4") codec. Distinct from LZ4
above, which maps to
+ // Arrow LZ4_RAW. Used by the Iceberg/Hive Parquet writers so the output
stays readable by
+ // engines that support only the Hadoop-framed LZ4 codec (e.g. Trino),
matching what
+ // Spark/Iceberg writes for `write.parquet.compression-codec=lz4`.
+ LZ4_HADOOP = 8,
}
enum TParquetVersion {
diff --git a/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
b/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
index 96d17545c9b..daff25f4285 100644
--- a/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
+++ b/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
@@ -29,6 +29,12 @@ true 1 1000 2.3
true 1 1000 2.3
true 1 1000 2.3
+-- !hive_parquet_lz4_write --
+doris_lz4
+
+-- !hive_parquet_lz4_footer_codec --
+LZ4
+
-- !insert01 --
true 123 9876543210 abcdefghij 3.14 6.28 123.4567
varcharval stringval
@@ -59,3 +65,6 @@ true 1 1000 2.3
true 1 1000 2.3
true 1 1000 2.3
+-- !hive_orc_lz4_write --
+doris_lz4
+
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.out
new file mode 100644
index 00000000000..cfa3e4ded04
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.out
@@ -0,0 +1,44 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !iceberg_parquet_lz4_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_snappy_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_zstd_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_uncompressed_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_lz4_footer_codec --
+LZ4
+
+-- !iceberg_orc_lz4_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_orc_lz4_files --
+orc 3
+
+-- !iceberg_parquet_lz4_delete --
+1
+
+-- !iceberg_parquet_lz4_delete_data --
+1 a
+3 c
+
+-- !iceberg_parquet_lz4_delete_files --
+parquet 1
+
+-- !iceberg_parquet_lz4_delete_footer_codec --
+LZ4
diff --git
a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
index 31979501422..f274be0bf92 100644
--- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
+++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
@@ -399,6 +399,26 @@ suite("test_hive_ddl", "p0,external") {
throw new Exception("Invalid compression type: ${compression}
for tbl_${file_format}_${compression}")
}
+ if (compression.equals("lz4")) {
+ sql """ INSERT INTO tbl_${file_format}_${compression} VALUES
('doris_lz4') """
+ def q_lz4 = "order_qt_hive_${file_format}_${compression}_write"
+ "${q_lz4}" """ SELECT * FROM tbl_${file_format}_${compression}
ORDER BY col """
+
+ if (file_format.equals("parquet")) {
+ String hdfsPort =
context.config.otherConfigs.get("hive2HdfsPort")
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ order_qt_hive_parquet_lz4_footer_codec """
+ SELECT DISTINCT compression
+ FROM parquet_meta(
+ "uri" =
"hdfs://${externalEnvIp}:${hdfsPort}/user/hive/warehouse/test_hive_compress.db/tbl_parquet_lz4/*",
+ "hadoop.username" = "doris",
+ "mode" = "parquet_metadata"
+ )
+ ORDER BY compression
+ """
+ }
+ }
+
sql """DROP TABLE `tbl_${file_format}_${compression}`"""
sql """ drop database if exists `test_hive_compress` """;
}
@@ -734,7 +754,7 @@ suite("test_hive_ddl", "p0,external") {
sql """set enable_fallback_to_original_planner=false;"""
test_db(catalog_name)
test_loc_db(externalEnvIp, hdfs_port, catalog_name)
- def compressions = ["snappy", "zlib", "zstd"]
+ def compressions = ["snappy", "zlib", "zstd", "lz4"]
for (String file_format in file_formats) {
logger.info("Process file format " + file_format)
test_loc_tbl(file_format, externalEnvIp, hdfs_port,
catalog_name)
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.groovy
new file mode 100644
index 00000000000..a0bcdafbfd7
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.groovy
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Verifies that Doris writes Iceberg Parquet LZ4 as Hadoop-framed Parquet
"LZ4"
+// (not LZ4_RAW) and that Iceberg ORC/delete-file LZ4 paths are reachable.
+suite("test_iceberg_write_parquet_compression", "p0,external") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "test_iceberg_write_parquet_compression"
+ String db = catalog_name + "_db"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """ switch ${catalog_name} """
+ sql """ drop database if exists ${db} force """
+ sql """ create database ${db} """
+
+ // Codecs Doris is expected to support for Parquet writes.
+ for (String codec : ["lz4", "snappy", "zstd", "uncompressed"]) {
+ String tbl = "${db}.tbl_${codec}"
+ sql """ drop table if exists ${tbl} """
+ sql """
+ CREATE TABLE ${tbl} (a STRING, b BIGINT) PROPERTIES (
+ 'write-format' = 'parquet',
+ 'write.format.default' = 'parquet',
+ 'format-version' = '2',
+ 'write.parquet.compression-codec' = '${codec}'
+ )"""
+
+ sql """ INSERT INTO ${tbl} VALUES ('doris0', 0), ('doris1', 1),
('doris2', 2) """
+
+ def q_data = "order_qt_iceberg_parquet_${codec}_data"
+ "${q_data}" """ SELECT a, b FROM ${tbl} ORDER BY b """
+ }
+
+ def lz4DataFiles = sql """ SELECT file_path FROM ${db}.tbl_lz4\$files
ORDER BY file_path """
+ String lz4DataFile = lz4DataFiles[0][0].toString()
+ order_qt_iceberg_parquet_lz4_footer_codec """
+ SELECT DISTINCT compression
+ FROM parquet_meta(
+ "uri" = "${lz4DataFile}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1",
+ "mode" = "parquet_metadata"
+ )
+ ORDER BY compression
+ """
+
+ sql """ DROP TABLE IF EXISTS ${db}.tbl_orc_lz4 """
+ sql """
+ CREATE TABLE ${db}.tbl_orc_lz4 (a STRING, b BIGINT) PROPERTIES (
+ 'write-format' = 'orc',
+ 'write.format.default' = 'orc',
+ 'format-version' = '2',
+ 'write.orc.compression-codec' = 'lz4'
+ )"""
+ sql """ INSERT INTO ${db}.tbl_orc_lz4 VALUES ('doris0', 0), ('doris1', 1),
('doris2', 2) """
+ order_qt_iceberg_orc_lz4_data """ SELECT a, b FROM ${db}.tbl_orc_lz4 ORDER
BY b """
+ order_qt_iceberg_orc_lz4_files """
+ SELECT lower(file_format), sum(record_count)
+ FROM ${db}.tbl_orc_lz4\$files
+ GROUP BY lower(file_format)
+ ORDER BY 1
+ """
+
+ sql """ DROP TABLE IF EXISTS ${db}.tbl_lz4_delete """
+ sql """
+ CREATE TABLE ${db}.tbl_lz4_delete (id INT, name STRING) PROPERTIES (
+ 'write-format' = 'parquet',
+ 'write.format.default' = 'parquet',
+ 'format-version' = '2',
+ 'write.parquet.compression-codec' = 'lz4',
+ 'write.delete.mode' = 'merge-on-read',
+ 'write.update.mode' = 'merge-on-read',
+ 'write.merge.mode' = 'merge-on-read'
+ )"""
+ sql """ INSERT INTO ${db}.tbl_lz4_delete VALUES (1, 'a'), (2, 'b'), (3,
'c') """
+ qt_iceberg_parquet_lz4_delete """ DELETE FROM ${db}.tbl_lz4_delete WHERE
id = 2 """
+ order_qt_iceberg_parquet_lz4_delete_data """
+ SELECT id, name FROM ${db}.tbl_lz4_delete ORDER BY id
+ """
+ order_qt_iceberg_parquet_lz4_delete_files """
+ SELECT lower(file_format), sum(record_count)
+ FROM ${db}.tbl_lz4_delete\$delete_files
+ GROUP BY lower(file_format)
+ ORDER BY 1
+ """
+ def lz4DeleteFiles = sql """
+ SELECT file_path
+ FROM ${db}.tbl_lz4_delete\$delete_files
+ ORDER BY file_path
+ """
+ String lz4DeleteFile = lz4DeleteFiles[0][0].toString()
+ order_qt_iceberg_parquet_lz4_delete_footer_codec """
+ SELECT DISTINCT compression
+ FROM parquet_meta(
+ "uri" = "${lz4DeleteFile}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1",
+ "mode" = "parquet_metadata"
+ )
+ ORDER BY compression
+ """
+
+ // GZIP is intentionally not yet supported by the Doris Parquet writer;
the INSERT must
+ // fail explicitly rather than silently fall back. Remove this block when
GZIP support is
+ // added (and update the cross-engine product test expectation in tandem).
+ sql """ drop table if exists ${db}.tbl_gzip """
+ sql """
+ CREATE TABLE ${db}.tbl_gzip (a STRING, b BIGINT) PROPERTIES (
+ 'write-format' = 'parquet',
+ 'write.format.default' = 'parquet',
+ 'format-version' = '2',
+ 'write.parquet.compression-codec' = 'gzip'
+ )"""
+ test {
+ sql """ INSERT INTO ${db}.tbl_gzip VALUES ('doris0', 0) """
+ exception "Unsupported compress type GZ with parquet"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]