This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f93f01334 feat: Factor out CompressionCodec to common location and add 
Gzip (#2081)
f93f01334 is described below

commit f93f01334cf34e618d157bbe0a10a108be2ddb0a
Author: emkornfield <[email protected]>
AuthorDate: Wed Jan 28 17:32:30 2026 -0800

    feat: Factor out CompressionCodec to common location and add Gzip (#2081)
    
    ## Which issue does this PR close?
    
    As discussed in the PR for [allowing compressed
    
metadata.json](https://github.com/apache/iceberg-rust/pull/1876#discussion_r2693266830)
    writes we want CompressionCodec in a central place so it can be used in
    puffin and for other compression use-cases. Happy to move it to `spec/`
    as suggested in the original comment but this seemed more natural.
    
    ## What changes are included in this PR?
    
    This moves compression.rs to a top level (seems like the best location
    for common code but please let me know if there is a better place.
    
    - It adds Gzip as a compression option and replaces current direct use
    the GzEncoder package.
    - It adds validation to puffin that Gzip is not currently supported (per
    spec)
    
    ## Are these changes tested?
    
    Added unit tests to cover additions and existing tests cover the rest.
---
 crates/iceberg/src/{puffin => }/compression.rs | 79 +++++++++++++++-----------
 crates/iceberg/src/lib.rs                      |  1 +
 crates/iceberg/src/puffin/metadata.rs          | 32 ++++++++++-
 crates/iceberg/src/puffin/mod.rs               | 47 ++++++++++++++-
 crates/iceberg/src/puffin/reader.rs            | 41 ++++++++++++-
 crates/iceberg/src/puffin/test_utils.rs        |  2 +-
 crates/iceberg/src/puffin/writer.rs            | 30 ++++++++--
 crates/iceberg/src/spec/table_metadata.rs      | 33 ++++++-----
 8 files changed, 204 insertions(+), 61 deletions(-)

diff --git a/crates/iceberg/src/puffin/compression.rs 
b/crates/iceberg/src/compression.rs
similarity index 57%
rename from crates/iceberg/src/puffin/compression.rs
rename to crates/iceberg/src/compression.rs
index a9a56ef12..1218d81df 100644
--- a/crates/iceberg/src/puffin/compression.rs
+++ b/crates/iceberg/src/compression.rs
@@ -15,6 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Compression codec support for data compression and decompression.
+
+use std::io::{Read, Write};
+
+use flate2::Compression;
+use flate2::read::GzDecoder;
+use flate2::write::GzEncoder;
 use serde::{Deserialize, Serialize};
 
 use crate::{Error, ErrorKind, Result};
@@ -30,6 +37,8 @@ pub enum CompressionCodec {
     Lz4,
     /// Zstandard single compression frame with content size present
     Zstd,
+    /// Gzip compression
+    Gzip,
 }
 
 impl CompressionCodec {
@@ -40,8 +49,11 @@ impl CompressionCodec {
                 ErrorKind::FeatureUnsupported,
                 "LZ4 decompression is not supported currently",
             )),
-            CompressionCodec::Zstd => {
-                let decompressed = zstd::stream::decode_all(&bytes[..])?;
+            CompressionCodec::Zstd => 
Ok(zstd::stream::decode_all(&bytes[..])?),
+            CompressionCodec::Gzip => {
+                let mut decoder = GzDecoder::new(&bytes[..]);
+                let mut decompressed = Vec::new();
+                decoder.read_to_end(&mut decompressed)?;
                 Ok(decompressed)
             }
         }
@@ -60,8 +72,12 @@ impl CompressionCodec {
                 encoder.include_checksum(true)?;
                 encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
                 std::io::copy(&mut &bytes[..], &mut encoder)?;
-                let compressed = encoder.finish()?;
-                Ok(compressed)
+                Ok(encoder.finish()?)
+            }
+            CompressionCodec::Gzip => {
+                let mut encoder = GzEncoder::new(Vec::new(), 
Compression::default());
+                encoder.write_all(&bytes)?;
+                Ok(encoder.finish()?)
             }
         }
     }
@@ -73,51 +89,48 @@ impl CompressionCodec {
 
 #[cfg(test)]
 mod tests {
-    use crate::puffin::compression::CompressionCodec;
+    use super::CompressionCodec;
 
     #[tokio::test]
     async fn test_compression_codec_none() {
-        let compression_codec = CompressionCodec::None;
         let bytes_vec = [0_u8; 100].to_vec();
 
-        let compressed = 
compression_codec.compress(bytes_vec.clone()).unwrap();
+        let codec = CompressionCodec::None;
+        let compressed = codec.compress(bytes_vec.clone()).unwrap();
         assert_eq!(bytes_vec, compressed);
-
-        let decompressed = 
compression_codec.decompress(compressed.clone()).unwrap();
-        assert_eq!(compressed, decompressed)
+        let decompressed = codec.decompress(compressed).unwrap();
+        assert_eq!(bytes_vec, decompressed);
     }
 
     #[tokio::test]
-    async fn test_compression_codec_lz4() {
-        let compression_codec = CompressionCodec::Lz4;
+    async fn test_compression_codec_compress() {
         let bytes_vec = [0_u8; 100].to_vec();
 
-        assert_eq!(
-            compression_codec
-                .compress(bytes_vec.clone())
-                .unwrap_err()
-                .to_string(),
-            "FeatureUnsupported => LZ4 compression is not supported currently",
-        );
-
-        assert_eq!(
-            compression_codec
-                .decompress(bytes_vec.clone())
-                .unwrap_err()
-                .to_string(),
-            "FeatureUnsupported => LZ4 decompression is not supported 
currently",
-        )
+        let compression_codecs = [CompressionCodec::Zstd, 
CompressionCodec::Gzip];
+
+        for codec in compression_codecs {
+            let compressed = codec.compress(bytes_vec.clone()).unwrap();
+            assert!(compressed.len() < bytes_vec.len());
+            let decompressed = codec.decompress(compressed).unwrap();
+            assert_eq!(decompressed, bytes_vec);
+        }
     }
 
     #[tokio::test]
-    async fn test_compression_codec_zstd() {
-        let compression_codec = CompressionCodec::Zstd;
+    async fn test_compression_codec_unsupported() {
+        let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")];
         let bytes_vec = [0_u8; 100].to_vec();
 
-        let compressed = 
compression_codec.compress(bytes_vec.clone()).unwrap();
-        assert!(compressed.len() < bytes_vec.len());
+        for (codec, name) in unsupported_codecs {
+            assert_eq!(
+                codec.compress(bytes_vec.clone()).unwrap_err().to_string(),
+                format!("FeatureUnsupported => {name} compression is not 
supported currently"),
+            );
 
-        let decompressed = 
compression_codec.decompress(compressed.clone()).unwrap();
-        assert_eq!(decompressed, bytes_vec)
+            assert_eq!(
+                codec.decompress(bytes_vec.clone()).unwrap_err().to_string(),
+                format!("FeatureUnsupported => {name} decompression is not 
supported currently"),
+            );
+        }
     }
 }
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 8d8f40f72..8b345deb6 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -77,6 +77,7 @@ pub mod table;
 
 mod avro;
 pub mod cache;
+pub mod compression;
 pub mod io;
 pub mod spec;
 
diff --git a/crates/iceberg/src/puffin/metadata.rs 
b/crates/iceberg/src/puffin/metadata.rs
index 15a8e9b33..b09f2c7c1 100644
--- a/crates/iceberg/src/puffin/metadata.rs
+++ b/crates/iceberg/src/puffin/metadata.rs
@@ -20,8 +20,8 @@ use std::collections::{HashMap, HashSet};
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
 
+use crate::compression::CompressionCodec;
 use crate::io::{FileRead, InputFile};
-use crate::puffin::compression::CompressionCodec;
 use crate::{Error, ErrorKind, Result};
 
 /// Human-readable identification of the application writing the file, along 
with its version.
@@ -954,4 +954,34 @@ mod tests {
 
         assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
     }
+
+    #[tokio::test]
+    async fn test_gzip_compression_allowed_in_metadata() {
+        let temp_dir = TempDir::new().unwrap();
+
+        // Create a JSON payload with Gzip compression codec
+        // Metadata should be readable, but accessing the blob will fail
+        let payload = r#"{
+            "blobs": [
+                {
+                    "type": "test-type",
+                    "fields": [1],
+                    "snapshot-id": 1,
+                    "sequence-number": 1,
+                    "offset": 4,
+                    "length": 10,
+                    "compression-codec": "gzip"
+                }
+            ]
+        }"#;
+
+        let input_file = input_file_with_payload(&temp_dir, payload).await;
+
+        // Reading metadata should succeed (lazy validation)
+        let result = FileMetadata::read(&input_file).await;
+        assert!(result.is_ok());
+        let metadata = result.unwrap();
+        assert_eq!(metadata.blobs.len(), 1);
+        assert_eq!(metadata.blobs[0].compression_codec, 
CompressionCodec::Gzip);
+    }
 }
diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs
index 0a0378165..854d4070f 100644
--- a/crates/iceberg/src/puffin/mod.rs
+++ b/crates/iceberg/src/puffin/mod.rs
@@ -19,11 +19,38 @@
 
 #![deny(missing_docs)]
 
+use crate::{Error, ErrorKind, Result};
+
 mod blob;
 pub use blob::{APACHE_DATASKETCHES_THETA_V1, Blob, DELETION_VECTOR_V1};
 
-mod compression;
-pub use compression::CompressionCodec;
+pub use crate::compression::CompressionCodec;
+
+/// Compression codecs supported by the Puffin spec.
+const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[
+    CompressionCodec::None,
+    CompressionCodec::Lz4,
+    CompressionCodec::Zstd,
+];
+
+/// Validates that the compression codec is supported for Puffin files.
+/// Returns an error if the codec is not supported.
+fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> {
+    if !SUPPORTED_PUFFIN_CODECS.contains(&codec) {
+        let supported_names: Vec<String> = SUPPORTED_PUFFIN_CODECS
+            .iter()
+            .map(|c| format!("{c:?}"))
+            .collect();
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Compression codec {codec:?} is not supported for Puffin 
files. Only {} are supported.",
+                supported_names.join(", ")
+            ),
+        ));
+    }
+    Ok(())
+}
 
 mod metadata;
 pub use metadata::{BlobMetadata, CREATED_BY_PROPERTY, FileMetadata};
@@ -36,3 +63,19 @@ pub use writer::PuffinWriter;
 
 #[cfg(test)]
 mod test_utils;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_puffin_codec_validation() {
+        // All codecs in SUPPORTED_PUFFIN_CODECS should be valid
+        for codec in SUPPORTED_PUFFIN_CODECS {
+            assert!(validate_puffin_compression(*codec).is_ok());
+        }
+
+        // Gzip should not be supported for Puffin files
+        assert!(validate_puffin_compression(CompressionCodec::Gzip).is_err());
+    }
+}
diff --git a/crates/iceberg/src/puffin/reader.rs 
b/crates/iceberg/src/puffin/reader.rs
index dce53d93f..a6308dceb 100644
--- a/crates/iceberg/src/puffin/reader.rs
+++ b/crates/iceberg/src/puffin/reader.rs
@@ -17,6 +17,7 @@
 
 use tokio::sync::OnceCell;
 
+use super::validate_puffin_compression;
 use crate::Result;
 use crate::io::{FileRead, InputFile};
 use crate::puffin::blob::Blob;
@@ -46,11 +47,13 @@ impl PuffinReader {
 
     /// Returns blob
     pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
+        validate_puffin_compression(blob_metadata.compression_codec)?;
+
         let file_read = self.input_file.reader().await?;
         let start = blob_metadata.offset;
         let end = start + blob_metadata.length;
-        let bytes = file_read.read(start..end).await?.to_vec();
-        let data = blob_metadata.compression_codec.decompress(bytes)?;
+        let bytes = file_read.read(start..end).await?;
+        let data = blob_metadata.compression_codec.decompress(bytes.to_vec())?;
 
         Ok(Blob {
             r#type: blob_metadata.r#type.clone(),
@@ -65,7 +68,11 @@ impl PuffinReader {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
 
+    use crate::ErrorKind;
+    use crate::compression::CompressionCodec;
+    use crate::puffin::metadata::BlobMetadata;
     use crate::puffin::reader::PuffinReader;
     use crate::puffin::test_utils::{
         blob_0, blob_1, java_uncompressed_metric_input_file,
@@ -122,4 +129,34 @@ mod tests {
             blob_1(),
         )
     }
+
+    #[tokio::test]
+    async fn test_gzip_compression_rejected_on_blob_access() {
+        // Use a real puffin file
+        let input_file = java_uncompressed_metric_input_file();
+        let reader = PuffinReader::new(input_file);
+
+        // Create a BlobMetadata with Gzip compression
+        let gzip_blob_metadata = BlobMetadata {
+            r#type: "test-type".to_string(),
+            fields: vec![1],
+            snapshot_id: 1,
+            sequence_number: 1,
+            offset: 4,
+            length: 10,
+            compression_codec: CompressionCodec::Gzip,
+            properties: HashMap::new(),
+        };
+
+        // Attempting to access the blob should fail
+        let result = reader.blob(&gzip_blob_metadata).await;
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind(), ErrorKind::DataInvalid);
+        assert!(err.to_string().contains("Gzip"));
+        assert!(
+            err.to_string()
+                .contains("is not supported for Puffin files")
+        );
+    }
 }
diff --git a/crates/iceberg/src/puffin/test_utils.rs 
b/crates/iceberg/src/puffin/test_utils.rs
index ca91fa217..a3232f53e 100644
--- a/crates/iceberg/src/puffin/test_utils.rs
+++ b/crates/iceberg/src/puffin/test_utils.rs
@@ -18,8 +18,8 @@
 use std::collections::HashMap;
 
 use super::blob::Blob;
+use crate::compression::CompressionCodec;
 use crate::io::{FileIOBuilder, InputFile};
-use crate::puffin::compression::CompressionCodec;
 use crate::puffin::metadata::{BlobMetadata, CREATED_BY_PROPERTY, FileMetadata};
 
 const JAVA_TESTDATA: &str = "testdata/puffin/java-generated";
diff --git a/crates/iceberg/src/puffin/writer.rs 
b/crates/iceberg/src/puffin/writer.rs
index f68efda88..c8e4233a9 100644
--- a/crates/iceberg/src/puffin/writer.rs
+++ b/crates/iceberg/src/puffin/writer.rs
@@ -19,10 +19,11 @@ use std::collections::{HashMap, HashSet};
 
 use bytes::Bytes;
 
+use super::validate_puffin_compression;
 use crate::Result;
+use crate::compression::CompressionCodec;
 use crate::io::{FileWrite, OutputFile};
 use crate::puffin::blob::Blob;
-use crate::puffin::compression::CompressionCodec;
 use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
 
 /// Puffin writer
@@ -64,6 +65,8 @@ impl PuffinWriter {
 
     /// Adds blob to Puffin file
     pub async fn add(&mut self, blob: Blob, compression_codec: 
CompressionCodec) -> Result<()> {
+        validate_puffin_compression(compression_codec)?;
+
         self.write_header_once().await?;
 
         let offset = self.num_bytes_written;
@@ -114,8 +117,7 @@ impl PuffinWriter {
             properties: self.properties.clone(),
         };
         let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
-        let bytes = json.as_bytes();
-        self.footer_compression_codec.compress(bytes.to_vec())
+        self.footer_compression_codec.compress(json.into_bytes())
     }
 
     fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as 
usize] {
@@ -148,10 +150,9 @@ mod tests {
 
     use tempfile::TempDir;
 
-    use crate::Result;
+    use crate::compression::CompressionCodec;
     use crate::io::{FileIOBuilder, InputFile, OutputFile};
     use crate::puffin::blob::Blob;
-    use crate::puffin::compression::CompressionCodec;
     use crate::puffin::metadata::FileMetadata;
     use crate::puffin::reader::PuffinReader;
     use crate::puffin::test_utils::{
@@ -161,6 +162,7 @@ mod tests {
         zstd_compressed_metric_file_metadata,
     };
     use crate::puffin::writer::PuffinWriter;
+    use crate::{ErrorKind, Result};
 
     async fn write_puffin_file(
         temp_dir: &TempDir,
@@ -331,4 +333,22 @@ mod tests {
         )
         .await
     }
+
+    #[tokio::test]
+    async fn test_gzip_compression_rejected() {
+        let temp_dir = TempDir::new().unwrap();
+        let blobs = vec![blob_0()];
+        let blobs_with_compression = blobs_with_compression(blobs, 
CompressionCodec::Gzip);
+
+        let result = write_puffin_file(&temp_dir, blobs_with_compression, 
file_properties()).await;
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind(), ErrorKind::DataInvalid);
+        assert!(err.to_string().contains("Gzip"));
+        assert!(
+            err.to_string()
+                .contains("is not supported for Puffin files")
+        );
+    }
 }
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index d3836b2f4..28f753e9f 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -22,12 +22,10 @@ use std::cmp::Ordering;
 use std::collections::HashMap;
 use std::fmt::{Display, Formatter};
 use std::hash::Hash;
-use std::io::Read as _;
 use std::sync::Arc;
 
 use _serde::TableMetadataEnum;
 use chrono::{DateTime, Utc};
-use flate2::read::GzDecoder;
 use serde::{Deserialize, Serialize};
 use serde_repr::{Deserialize_repr, Serialize_repr};
 use uuid::Uuid;
@@ -39,6 +37,7 @@ use super::{
     SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, 
StructType,
     TableProperties,
 };
+use crate::compression::CompressionCodec;
 use crate::error::{Result, timestamp_ms_to_utc};
 use crate::io::FileIO;
 use crate::spec::EncryptedKey;
@@ -445,16 +444,16 @@ impl TableMetadata {
             && metadata_content[0] == 0x1F
             && metadata_content[1] == 0x8B
         {
-            let mut decoder = GzDecoder::new(metadata_content.as_ref());
-            let mut decompressed_data = Vec::new();
-            decoder.read_to_end(&mut decompressed_data).map_err(|e| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    "Trying to read compressed metadata file",
-                )
-                .with_context("file_path", metadata_location)
-                .with_source(e)
-            })?;
+            let decompressed_data = CompressionCodec::Gzip
+                .decompress(metadata_content.to_vec())
+                .map_err(|e| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Trying to read compressed metadata file",
+                    )
+                    .with_context("file_path", metadata_location)
+                    .with_source(e)
+                })?;
             serde_json::from_slice(&decompressed_data)?
         } else {
             serde_json::from_slice(&metadata_content)?
@@ -1559,7 +1558,6 @@ impl SnapshotLog {
 mod tests {
     use std::collections::HashMap;
     use std::fs;
-    use std::io::Write as _;
     use std::sync::Arc;
 
     use anyhow::Result;
@@ -1569,6 +1567,7 @@ mod tests {
     use uuid::Uuid;
 
     use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
+    use crate::compression::CompressionCodec;
     use crate::io::FileIOBuilder;
     use crate::spec::table_metadata::TableMetadata;
     use crate::spec::{
@@ -3576,10 +3575,10 @@ mod tests {
         let original_metadata: TableMetadata = 
get_test_table_metadata("TableMetadataV2Valid.json");
         let json = serde_json::to_string(&original_metadata).unwrap();
 
-        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), 
flate2::Compression::default());
-        encoder.write_all(json.as_bytes()).unwrap();
-        std::fs::write(&metadata_location, encoder.finish().unwrap())
-            .expect("failed to write metadata");
+        let compressed = CompressionCodec::Gzip
+            .compress(json.into_bytes())
+            .expect("failed to compress metadata");
+        std::fs::write(&metadata_location, &compressed).expect("failed to 
write metadata");
 
         // Read the metadata back
         let file_io = FileIOBuilder::new_fs_io().build().unwrap();

Reply via email to