kevinjqliu commented on code in PR #1851:
URL: https://github.com/apache/iceberg-rust/pull/1851#discussion_r2530109818


##########
crates/iceberg/src/spec/manifest_list.rs:
##########
@@ -198,23 +202,52 @@ impl ManifestListWriter {
             sequence_number,
             snapshot_id,
             first_row_id,
+            CompressionSettings::default(),
         )
     }
 
+    /// Set compression settings for the manifest list file.
+    pub fn with_compression(mut self, compression: CompressionSettings) -> 
Self {
+        self.compression = compression.clone();
+
+        // Recreate the avro_writer with the new codec
+        let avro_schema = match self.format_version {
+            FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
+            FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
+            FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
+        };
+
+        // Use CompressionSettings to get codec
+        let codec = compression.to_codec();
+
+        let new_writer = Writer::with_codec(avro_schema, Vec::new(), codec);
+
+        // Copy over existing metadata from the old writer
+        // Unfortunately, we can't extract metadata from the old writer,
+        // so we'll need to handle this differently
+        self.avro_writer = new_writer;

Review Comment:
   im a bit confused by this comment, do we need to copy over the metadata? 
   
   similar to
   
https://github.com/apache/iceberg-rust/blob/51e781ea5cf85e37791965a8f81bb26496bbaf1e/crates/iceberg/src/spec/manifest_list.rs#L250-L255



##########
crates/iceberg/src/spec/avro_util.rs:
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for working with Apache Avro in Iceberg.
+
+use apache_avro::Codec;
+use log::warn;
+
+/// Settings for compression codec and level.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CompressionSettings {
+    /// The compression codec name (e.g., "gzip", "zstd", "deflate", "none")
+    pub codec: String,
+    /// The compression level
+    pub level: u8,
+}
+
+impl CompressionSettings {
+    /// Create a new CompressionSettings with the specified codec and level.
+    pub fn new(codec: String, level: u8) -> Self {
+        Self { codec, level }
+    }
+
+    /// Convert to apache_avro::Codec using the codec_from_str helper function.
+    pub(crate) fn to_codec(&self) -> Codec {
+        codec_from_str(Some(&self.codec), self.level)
+    }
+}
+
+impl Default for CompressionSettings {
+    fn default() -> Self {
+        use crate::spec::TableProperties;
+        Self {
+            codec: 
TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(),
+            level: TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT,
+        }
+    }
+}
+
+/// Convert codec name and level to apache_avro::Codec.
+/// Returns Codec::Null for unknown or unsupported codecs.
+///
+/// # Arguments
+///
+/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", 
"deflate", "none")
+/// * `level` - The compression level. For deflate/gzip:
+///   - 0: NoCompression
+///   - 1: BestSpeed
+///   - 9: BestCompression
+///   - 10: UberCompression
+///   - Other values: DefaultLevel (6)
+///
+/// # Supported Codecs
+///
+/// - `gzip` or `deflate`: Uses Deflate compression with specified level
+/// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range)
+/// - `none` or `None`: No compression
+/// - Any other value: Defaults to no compression (Codec::Null)
+///
+/// # Compression Levels
+///
+/// The compression level mapping is based on miniz_oxide's CompressionLevel 
enum:
+/// - Level 0: No compression
+/// - Level 1: Best speed (fastest)
+/// - Level 9: Best compression (slower, better compression)
+/// - Level 10: Uber compression (slowest, best compression)
+/// - Other: Default level (balanced speed/compression)
+pub(crate) fn codec_from_str(codec: Option<&str>, level: u8) -> Codec {
+    use apache_avro::{DeflateSettings, ZstandardSettings};
+
+    match codec {
+        Some("gzip") | Some("deflate") => {
+            // Map compression level to miniz_oxide::deflate::CompressionLevel
+            // Reference: 
https://docs.rs/miniz_oxide/latest/miniz_oxide/deflate/enum.CompressionLevel.html
+            use miniz_oxide::deflate::CompressionLevel;
+
+            let compression_level = match level {
+                0 => CompressionLevel::NoCompression,
+                1 => CompressionLevel::BestSpeed,
+                9 => CompressionLevel::BestCompression,
+                10 => CompressionLevel::UberCompression,
+                _ => CompressionLevel::DefaultLevel,
+            };
+
+            Codec::Deflate(DeflateSettings::new(compression_level))
+        }
+        Some("zstd") => {
+            // Zstandard supports levels 0-22, clamp to valid range
+            let zstd_level = level.min(22);

Review Comment:
   btw java sets default zstd compression level to 1 but i think its fine to 
diverge here



##########
crates/iceberg/src/spec/table_properties.rs:
##########
@@ -137,6 +143,21 @@ impl TableProperties {
     pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = 
"write.target-file-size-bytes";
     /// Default target file size
     pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 
1024 * 1024; // 512 MB
+
+    /// Compression codec for metadata files (JSON)
+    pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = 
"write.metadata.compression-codec";
+    /// Default metadata compression codec - none
+    pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
+
+    /// Compression codec for Avro files (manifests, manifest lists)
+    pub const PROPERTY_AVRO_COMPRESSION_CODEC: &str = 
"write.avro.compression-codec";
+    /// Default Avro compression codec - gzip
+    pub const PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT: &str = "gzip";
+
+    /// Compression level for Avro files
+    pub const PROPERTY_AVRO_COMPRESSION_LEVEL: &str = 
"write.avro.compression-level";
+    /// Default Avro compression level (9 = BestCompression)
+    pub const PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT: u8 = 9;

Review Comment:
   In the [Iceberg 
docs](https://iceberg.apache.org/docs/nightly/configuration/#write-properties),
   ```
   write.avro.compression-level null    Avro compression level
   ```
   Each compression codec has its own default value 
[here](https://github.com/apache/iceberg/blob/571b696106f4748d3edee9ee641e214377ddcb16/core/src/main/java/org/apache/iceberg/avro/Avro.java#L259-L267)
   And gzip defaults to level 9
   
   Should we follow the same pattern? 



##########
crates/iceberg/src/spec/avro_util.rs:
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for working with Apache Avro in Iceberg.
+
+use apache_avro::Codec;
+use log::warn;
+
+/// Settings for compression codec and level.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CompressionSettings {
+    /// The compression codec name (e.g., "gzip", "zstd", "deflate", "none")
+    pub codec: String,
+    /// The compression level
+    pub level: u8,
+}
+
+impl CompressionSettings {
+    /// Create a new CompressionSettings with the specified codec and level.
+    pub fn new(codec: String, level: u8) -> Self {
+        Self { codec, level }
+    }
+
+    /// Convert to apache_avro::Codec using the codec_from_str helper function.
+    pub(crate) fn to_codec(&self) -> Codec {
+        codec_from_str(Some(&self.codec), self.level)
+    }
+}
+
+impl Default for CompressionSettings {
+    fn default() -> Self {
+        use crate::spec::TableProperties;
+        Self {
+            codec: 
TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(),
+            level: TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT,
+        }
+    }
+}
+
+/// Convert codec name and level to apache_avro::Codec.
+/// Returns Codec::Null for unknown or unsupported codecs.
+///
+/// # Arguments
+///
+/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", 
"deflate", "none")
+/// * `level` - The compression level. For deflate/gzip:
+///   - 0: NoCompression
+///   - 1: BestSpeed
+///   - 9: BestCompression
+///   - 10: UberCompression
+///   - Other values: DefaultLevel (6)
+///
+/// # Supported Codecs
+///
+/// - `gzip` or `deflate`: Uses Deflate compression with specified level
+/// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range)
+/// - `none` or `None`: No compression

Review Comment:
   i think this should be `uncompressed` instead of `none`, to match the same 
string java uses
   
   
https://github.com/apache/iceberg/blob/571b696106f4748d3edee9ee641e214377ddcb16/core/src/main/java/org/apache/iceberg/avro/Avro.java#L252-L267



##########
crates/iceberg/src/spec/table_metadata.rs:
##########
@@ -461,9 +461,57 @@ impl TableMetadata {
         file_io: &FileIO,
         metadata_location: impl AsRef<str>,
     ) -> Result<()> {
+        use std::io::Write as _;
+
+        use flate2::write::GzEncoder;
+
+        let json_data = serde_json::to_vec(self)?;
+
+        // Check if compression is enabled via table properties
+        let codec = self
+            .properties
+            
.get(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
+            .map(|s| s.as_str())
+            
.unwrap_or(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
+
+        let (data_to_write, actual_location) = match codec {
+            "gzip" => {
+                let mut encoder = GzEncoder::new(Vec::new(), 
flate2::Compression::default());
+                encoder.write_all(&json_data).map_err(|e| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Failed to compress metadata with gzip",
+                    )
+                    .with_source(e)
+                })?;
+                let compressed_data = encoder.finish().map_err(|e| {
+                    Error::new(ErrorKind::DataInvalid, "Failed to finish gzip 
compression")
+                        .with_source(e)
+                })?;
+
+                // Modify filename to add .gz before .metadata.json
+                let location = metadata_location.as_ref();
+                let new_location = if location.ends_with(".metadata.json") {
+                    location.replace(".metadata.json", ".gz.metadata.json")

Review Comment:
   😄  
https://iceberg.apache.org/spec/#naming-for-gzip-compressed-metadata-json-files
   
   i wonder if its a good idea to just default to `metadata.json.gz` here and 
not create more files with the `.gz.metadata.json` suffix



##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -205,7 +220,9 @@ impl<'a> SnapshotProducer<'a> {
                 .default_partition_spec()
                 .as_ref()
                 .clone(),
-        );
+        )
+        .with_compression(compression);

Review Comment:
   maybe this is a rust stylistic thing, but curious why use this pattern 
instead of adding an optional `compression` param to `ManifestWriterBuilder`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to