This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 00ad3f981d7 branch-4.1: [fix](iceberg)Support LZ4 compression for 
iceberg/hive Parquet/ORC writers. #64723 (#64917)
00ad3f981d7 is described below

commit 00ad3f981d721a8695d96e14887981e012cf9d4a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 30 14:15:01 2026 +0800

    branch-4.1: [fix](iceberg)Support LZ4 compression for iceberg/hive 
Parquet/ORC writers. #64723 (#64917)
    
    Cherry-picked from #64723
    
    Co-authored-by: daidai <[email protected]>
---
 .../writer/iceberg/viceberg_delete_file_writer.cpp |   4 +
 .../writer/iceberg/viceberg_partition_writer.cpp   |  10 ++
 be/src/exec/sink/writer/vhive_partition_writer.cpp |   5 +
 be/src/format/transformer/vorc_transformer.cpp     |   7 +
 be/src/format/transformer/vparquet_transformer.cpp |  11 ++
 .../org/apache/doris/datasource/hive/HiveUtil.java |   6 +-
 gensrc/thrift/DataSinks.thrift                     |   5 +
 .../external_table_p0/hive/ddl/test_hive_ddl.out   |   9 ++
 .../test_iceberg_write_parquet_compression.out     |  44 ++++++
 .../hive/ddl/test_hive_ddl.groovy                  |  22 ++-
 .../test_iceberg_write_parquet_compression.groovy  | 155 +++++++++++++++++++++
 11 files changed, 275 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
index 968ac987e9a..0225d3a8457 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
@@ -93,6 +93,10 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state, 
RuntimeProfile* profi
         case TFileCompressType::ZSTD:
             parquet_compression_type = TParquetCompressionType::ZSTD;
             break;
+        case TFileCompressType::LZ4BLOCK:
+            // Hadoop-framed Parquet LZ4 (not LZ4_RAW) for cross-engine 
compatibility.
+            parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
+            break;
         default:
             return Status::InternalError("Unsupported compress type {} with 
parquet",
                                          to_string(_compress_type));
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 8766c945f5c..434488266bb 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -80,6 +80,12 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profil
             parquet_compression_type = TParquetCompressionType::ZSTD;
             break;
         }
+        case TFileCompressType::LZ4BLOCK: {
+            // Map Doris LZ4 to the Hadoop-framed Parquet LZ4 codec (not 
LZ4_RAW) so the file
+            // stays readable across Spark/Iceberg/Trino. See 
ParquetBuildHelper.
+            parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
+            break;
+        }
         default: {
             return Status::InternalError("Unsupported compress type {} with 
parquet",
                                          to_string(_compress_type));
@@ -179,6 +185,10 @@ std::string VIcebergPartitionWriter::_get_file_extension(
         compress_name = ".zstd";
         break;
     }
+    case TFileCompressType::LZ4BLOCK: {
+        compress_name = ".lz4";
+        break;
+    }
     default: {
         compress_name = "";
         break;
diff --git a/be/src/exec/sink/writer/vhive_partition_writer.cpp 
b/be/src/exec/sink/writer/vhive_partition_writer.cpp
index 686732b4712..4bfc728c8c9 100644
--- a/be/src/exec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/exec/sink/writer/vhive_partition_writer.cpp
@@ -83,6 +83,11 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* operator_
             parquet_compression_type = TParquetCompressionType::ZSTD;
             break;
         }
+        case TFileCompressType::LZ4BLOCK: {
+            // Hadoop-framed Parquet LZ4 (not LZ4_RAW) for cross-engine 
compatibility.
+            parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
+            break;
+        }
         default: {
             return Status::InternalError("Unsupported hive compress type {} 
with parquet",
                                          to_string(_hive_compress_type));
diff --git a/be/src/format/transformer/vorc_transformer.cpp 
b/be/src/format/transformer/vorc_transformer.cpp
index 87e066b6c49..3e3949cd924 100644
--- a/be/src/format/transformer/vorc_transformer.cpp
+++ b/be/src/format/transformer/vorc_transformer.cpp
@@ -184,6 +184,13 @@ void VOrcTransformer::set_compression_type(const 
TFileCompressType::type& compre
         
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD);
         break;
     }
+    case TFileCompressType::LZ4BLOCK: {
+        // ORC has a single, unambiguous LZ4 codec (raw LZ4 inside ORC's own 
compression
+        // framing), interoperable with Spark/Trino. Honor the requested codec 
instead of
+        // silently falling back to ZLIB below.
+        
_write_options->setCompression(orc::CompressionKind::CompressionKind_LZ4);
+        break;
+    }
     default: {
         
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
     }
diff --git a/be/src/format/transformer/vparquet_transformer.cpp 
b/be/src/format/transformer/vparquet_transformer.cpp
index 4d89261a379..0b25059b880 100644
--- a/be/src/format/transformer/vparquet_transformer.cpp
+++ b/be/src/format/transformer/vparquet_transformer.cpp
@@ -124,6 +124,17 @@ void ParquetBuildHelper::build_compression_type(
         builder.compression(arrow::Compression::LZ4);
         break;
     }
+    case TParquetCompressionType::LZ4_HADOOP: {
+        constexpr int64_t HADOOP_LZ4_DEFAULT_BUFFER_SIZE = 256 * 1024;
+        // Hadoop-framed LZ4 -> Parquet thrift codec "LZ4" (deprecated). This 
matches what
+        // Spark/Iceberg writes for `write.parquet.compression-codec=lz4`. 
Arrow 17 emits one
+        // Hadoop LZ4 block per Parquet page/dictionary page, while Hadoop JVM 
readers default to
+        // a 256 KiB LZ4 codec buffer, so keep page targets below that buffer 
size.
+        builder.compression(arrow::Compression::LZ4_HADOOP);
+        builder.data_pagesize(HADOOP_LZ4_DEFAULT_BUFFER_SIZE / 2);
+        builder.dictionary_pagesize_limit(HADOOP_LZ4_DEFAULT_BUFFER_SIZE / 2);
+        break;
+    }
     // arrow do not support lzo and bz2 compression type.
     // case TParquetCompressionType::LZO: {
     //     builder.compression(arrow::Compression::LZO);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index 059a2445d1b..f87f7a02c6c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -65,8 +65,10 @@ import java.util.stream.Collectors;
 public final class HiveUtil {
 
     public static final String COMPRESSION_KEY = "compression";
-    public static final Set<String> SUPPORTED_ORC_COMPRESSIONS = 
ImmutableSet.of("plain", "zlib", "snappy", "zstd");
-    public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS = 
ImmutableSet.of("plain", "snappy", "zstd");
+    public static final Set<String> SUPPORTED_ORC_COMPRESSIONS =
+            ImmutableSet.of("plain", "zlib", "snappy", "zstd", "lz4");
+    public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS =
+            ImmutableSet.of("plain", "snappy", "zstd", "lz4");
     public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
             ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
 
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 0d6d9126a06..058bab50491 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -63,6 +63,11 @@ enum TParquetCompressionType {
     LZO = 5,
     BZ2 = 6,
     UNCOMPRESSED = 7,
+    // Hadoop-framed (deprecated Parquet "LZ4") codec. Distinct from LZ4 
above, which maps to
+    // Arrow LZ4_RAW. Used by the Iceberg/Hive Parquet writers so the output 
stays readable by
+    // engines that support only the Hadoop-framed LZ4 codec (e.g. Trino), 
matching what
+    // Spark/Iceberg writes for `write.parquet.compression-codec=lz4`.
+    LZ4_HADOOP = 8,
 }
 
 enum TParquetVersion {
diff --git a/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out 
b/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
index 96d17545c9b..daff25f4285 100644
--- a/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
+++ b/regression-test/data/external_table_p0/hive/ddl/test_hive_ddl.out
@@ -29,6 +29,12 @@ true 1       1000    2.3
 true   1       1000    2.3
 true   1       1000    2.3
 
+-- !hive_parquet_lz4_write --
+doris_lz4
+
+-- !hive_parquet_lz4_footer_codec --
+LZ4
+
 -- !insert01 --
 true   123     9876543210      abcdefghij      3.14    6.28    123.4567        
varcharval      stringval
 
@@ -59,3 +65,6 @@ true  1       1000    2.3
 true   1       1000    2.3
 true   1       1000    2.3
 
+-- !hive_orc_lz4_write --
+doris_lz4
+
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.out
new file mode 100644
index 00000000000..cfa3e4ded04
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.out
@@ -0,0 +1,44 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !iceberg_parquet_lz4_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_snappy_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_zstd_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_uncompressed_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_parquet_lz4_footer_codec --
+LZ4
+
+-- !iceberg_orc_lz4_data --
+doris0 0
+doris1 1
+doris2 2
+
+-- !iceberg_orc_lz4_files --
+orc    3
+
+-- !iceberg_parquet_lz4_delete --
+1
+
+-- !iceberg_parquet_lz4_delete_data --
+1      a
+3      c
+
+-- !iceberg_parquet_lz4_delete_files --
+parquet        1
+
+-- !iceberg_parquet_lz4_delete_footer_codec --
+LZ4
diff --git 
a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy 
b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
index 43597b4557f..a3bf1410fbf 100644
--- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
+++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy
@@ -399,6 +399,26 @@ suite("test_hive_ddl", 
"p0,external,hive,external_docker,external_docker_hive")
                 throw new Exception("Invalid compression type: ${compression} 
for tbl_${file_format}_${compression}")
             }
 
+            if (compression.equals("lz4")) {
+                sql """ INSERT INTO tbl_${file_format}_${compression} VALUES 
('doris_lz4') """
+                def q_lz4 = "order_qt_hive_${file_format}_${compression}_write"
+                "${q_lz4}" """ SELECT * FROM tbl_${file_format}_${compression} 
ORDER BY col """
+
+                if (file_format.equals("parquet")) {
+                    String hdfsPort = 
context.config.otherConfigs.get("hive2HdfsPort")
+                    String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+                    order_qt_hive_parquet_lz4_footer_codec """
+                        SELECT DISTINCT compression
+                        FROM parquet_meta(
+                            "uri" = 
"hdfs://${externalEnvIp}:${hdfsPort}/user/hive/warehouse/test_hive_compress.db/tbl_parquet_lz4/*",
+                            "hadoop.username" = "doris",
+                            "mode" = "parquet_metadata"
+                        )
+                        ORDER BY compression
+                    """
+                }
+            }
+
             sql """DROP TABLE `tbl_${file_format}_${compression}`"""
             sql """ drop database if exists `test_hive_compress` """;
         }
@@ -734,7 +754,7 @@ suite("test_hive_ddl", 
"p0,external,hive,external_docker,external_docker_hive")
             sql """set enable_fallback_to_original_planner=false;"""
             test_db(catalog_name)
             test_loc_db(externalEnvIp, hdfs_port, catalog_name)
-            def compressions = ["snappy", "zlib", "zstd"]
+            def compressions = ["snappy", "zlib", "zstd", "lz4"]
             for (String file_format in file_formats) {
                 logger.info("Process file format " + file_format)
                 test_loc_tbl(file_format, externalEnvIp, hdfs_port, 
catalog_name)
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.groovy
new file mode 100644
index 00000000000..a0bcdafbfd7
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_parquet_compression.groovy
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Verifies that Doris writes Iceberg Parquet LZ4 as Hadoop-framed Parquet 
"LZ4"
+// (not LZ4_RAW) and that Iceberg ORC/delete-file LZ4 paths are reachable.
+suite("test_iceberg_write_parquet_compression", "p0,external") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "test_iceberg_write_parquet_compression"
+    String db = catalog_name + "_db"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """ switch ${catalog_name} """
+    sql """ drop database if exists ${db} force """
+    sql """ create database ${db} """
+
+    // Codecs Doris is expected to support for Parquet writes.
+    for (String codec : ["lz4", "snappy", "zstd", "uncompressed"]) {
+        String tbl = "${db}.tbl_${codec}"
+        sql """ drop table if exists ${tbl} """
+        sql """
+        CREATE TABLE ${tbl} (a STRING, b BIGINT) PROPERTIES (
+            'write-format' = 'parquet',
+            'write.format.default' = 'parquet',
+            'format-version' = '2',
+            'write.parquet.compression-codec' = '${codec}'
+        )"""
+
+        sql """ INSERT INTO ${tbl} VALUES ('doris0', 0), ('doris1', 1), 
('doris2', 2) """
+
+        def q_data = "order_qt_iceberg_parquet_${codec}_data"
+        "${q_data}" """ SELECT a, b FROM ${tbl} ORDER BY b """
+    }
+
+    def lz4DataFiles = sql """ SELECT file_path FROM ${db}.tbl_lz4\$files 
ORDER BY file_path """
+    String lz4DataFile = lz4DataFiles[0][0].toString()
+    order_qt_iceberg_parquet_lz4_footer_codec """
+        SELECT DISTINCT compression
+        FROM parquet_meta(
+            "uri" = "${lz4DataFile}",
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+            "s3.region" = "us-east-1",
+            "mode" = "parquet_metadata"
+        )
+        ORDER BY compression
+    """
+
+    sql """ DROP TABLE IF EXISTS ${db}.tbl_orc_lz4 """
+    sql """
+    CREATE TABLE ${db}.tbl_orc_lz4 (a STRING, b BIGINT) PROPERTIES (
+        'write-format' = 'orc',
+        'write.format.default' = 'orc',
+        'format-version' = '2',
+        'write.orc.compression-codec' = 'lz4'
+    )"""
+    sql """ INSERT INTO ${db}.tbl_orc_lz4 VALUES ('doris0', 0), ('doris1', 1), 
('doris2', 2) """
+    order_qt_iceberg_orc_lz4_data """ SELECT a, b FROM ${db}.tbl_orc_lz4 ORDER 
BY b """
+    order_qt_iceberg_orc_lz4_files """
+        SELECT lower(file_format), sum(record_count)
+        FROM ${db}.tbl_orc_lz4\$files
+        GROUP BY lower(file_format)
+        ORDER BY 1
+    """
+
+    sql """ DROP TABLE IF EXISTS ${db}.tbl_lz4_delete """
+    sql """
+    CREATE TABLE ${db}.tbl_lz4_delete (id INT, name STRING) PROPERTIES (
+        'write-format' = 'parquet',
+        'write.format.default' = 'parquet',
+        'format-version' = '2',
+        'write.parquet.compression-codec' = 'lz4',
+        'write.delete.mode' = 'merge-on-read',
+        'write.update.mode' = 'merge-on-read',
+        'write.merge.mode' = 'merge-on-read'
+    )"""
+    sql """ INSERT INTO ${db}.tbl_lz4_delete VALUES (1, 'a'), (2, 'b'), (3, 
'c') """
+    qt_iceberg_parquet_lz4_delete """ DELETE FROM ${db}.tbl_lz4_delete WHERE 
id = 2 """
+    order_qt_iceberg_parquet_lz4_delete_data """
+        SELECT id, name FROM ${db}.tbl_lz4_delete ORDER BY id
+    """
+    order_qt_iceberg_parquet_lz4_delete_files """
+        SELECT lower(file_format), sum(record_count)
+        FROM ${db}.tbl_lz4_delete\$delete_files
+        GROUP BY lower(file_format)
+        ORDER BY 1
+    """
+    def lz4DeleteFiles = sql """
+        SELECT file_path
+        FROM ${db}.tbl_lz4_delete\$delete_files
+        ORDER BY file_path
+    """
+    String lz4DeleteFile = lz4DeleteFiles[0][0].toString()
+    order_qt_iceberg_parquet_lz4_delete_footer_codec """
+        SELECT DISTINCT compression
+        FROM parquet_meta(
+            "uri" = "${lz4DeleteFile}",
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+            "s3.region" = "us-east-1",
+            "mode" = "parquet_metadata"
+        )
+        ORDER BY compression
+    """
+
+    // GZIP is intentionally not yet supported by the Doris Parquet writer; 
the INSERT must
+    // fail explicitly rather than silently fall back. Remove this block when 
GZIP support is
+    // added (and update the cross-engine product test expectation in tandem).
+    sql """ drop table if exists ${db}.tbl_gzip """
+    sql """
+    CREATE TABLE ${db}.tbl_gzip (a STRING, b BIGINT) PROPERTIES (
+        'write-format' = 'parquet',
+        'write.format.default' = 'parquet',
+        'format-version' = '2',
+        'write.parquet.compression-codec' = 'gzip'
+    )"""
+    test {
+        sql """ INSERT INTO ${db}.tbl_gzip VALUES ('doris0', 0) """
+        exception "Unsupported compress type GZ with parquet"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to