This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f93f01334 feat: Factor out CompressionCodec to common location and add
Gzip (#2081)
f93f01334 is described below
commit f93f01334cf34e618d157bbe0a10a108be2ddb0a
Author: emkornfield <[email protected]>
AuthorDate: Wed Jan 28 17:32:30 2026 -0800
feat: Factor out CompressionCodec to common location and add Gzip (#2081)
## Which issue does this PR close?
As discussed in the PR for [allowing compressed
metadata.json](https://github.com/apache/iceberg-rust/pull/1876#discussion_r2693266830)
writes we want CompressionCodec in a central place so it can be used in
puffin and for other compression use-cases. Happy to move it to `spec/`
as suggested in the original comment but this seemed more natural.
## What changes are included in this PR?
This moves compression.rs to a top level (seems like the best location
for common code but please let me know if there is a better place.
- It adds Gzip as a compression option and replaces current direct use
the GzEncoder package.
- It adds validation to puffin that Gzip is not currently supported (per
spec)
## Are these changes tested?
Added unit tests to cover additions and existing tests cover the rest.
---
crates/iceberg/src/{puffin => }/compression.rs | 79 +++++++++++++++-----------
crates/iceberg/src/lib.rs | 1 +
crates/iceberg/src/puffin/metadata.rs | 32 ++++++++++-
crates/iceberg/src/puffin/mod.rs | 47 ++++++++++++++-
crates/iceberg/src/puffin/reader.rs | 41 ++++++++++++-
crates/iceberg/src/puffin/test_utils.rs | 2 +-
crates/iceberg/src/puffin/writer.rs | 30 ++++++++--
crates/iceberg/src/spec/table_metadata.rs | 33 ++++++-----
8 files changed, 204 insertions(+), 61 deletions(-)
diff --git a/crates/iceberg/src/puffin/compression.rs
b/crates/iceberg/src/compression.rs
similarity index 57%
rename from crates/iceberg/src/puffin/compression.rs
rename to crates/iceberg/src/compression.rs
index a9a56ef12..1218d81df 100644
--- a/crates/iceberg/src/puffin/compression.rs
+++ b/crates/iceberg/src/compression.rs
@@ -15,6 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+//! Compression codec support for data compression and decompression.
+
+use std::io::{Read, Write};
+
+use flate2::Compression;
+use flate2::read::GzDecoder;
+use flate2::write::GzEncoder;
use serde::{Deserialize, Serialize};
use crate::{Error, ErrorKind, Result};
@@ -30,6 +37,8 @@ pub enum CompressionCodec {
Lz4,
/// Zstandard single compression frame with content size present
Zstd,
+ /// Gzip compression
+ Gzip,
}
impl CompressionCodec {
@@ -40,8 +49,11 @@ impl CompressionCodec {
ErrorKind::FeatureUnsupported,
"LZ4 decompression is not supported currently",
)),
- CompressionCodec::Zstd => {
- let decompressed = zstd::stream::decode_all(&bytes[..])?;
+ CompressionCodec::Zstd =>
Ok(zstd::stream::decode_all(&bytes[..])?),
+ CompressionCodec::Gzip => {
+ let mut decoder = GzDecoder::new(&bytes[..]);
+ let mut decompressed = Vec::new();
+ decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
}
@@ -60,8 +72,12 @@ impl CompressionCodec {
encoder.include_checksum(true)?;
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
std::io::copy(&mut &bytes[..], &mut encoder)?;
- let compressed = encoder.finish()?;
- Ok(compressed)
+ Ok(encoder.finish()?)
+ }
+ CompressionCodec::Gzip => {
+ let mut encoder = GzEncoder::new(Vec::new(),
Compression::default());
+ encoder.write_all(&bytes)?;
+ Ok(encoder.finish()?)
}
}
}
@@ -73,51 +89,48 @@ impl CompressionCodec {
#[cfg(test)]
mod tests {
- use crate::puffin::compression::CompressionCodec;
+ use super::CompressionCodec;
#[tokio::test]
async fn test_compression_codec_none() {
- let compression_codec = CompressionCodec::None;
let bytes_vec = [0_u8; 100].to_vec();
- let compressed =
compression_codec.compress(bytes_vec.clone()).unwrap();
+ let codec = CompressionCodec::None;
+ let compressed = codec.compress(bytes_vec.clone()).unwrap();
assert_eq!(bytes_vec, compressed);
-
- let decompressed =
compression_codec.decompress(compressed.clone()).unwrap();
- assert_eq!(compressed, decompressed)
+ let decompressed = codec.decompress(compressed).unwrap();
+ assert_eq!(bytes_vec, decompressed);
}
#[tokio::test]
- async fn test_compression_codec_lz4() {
- let compression_codec = CompressionCodec::Lz4;
+ async fn test_compression_codec_compress() {
let bytes_vec = [0_u8; 100].to_vec();
- assert_eq!(
- compression_codec
- .compress(bytes_vec.clone())
- .unwrap_err()
- .to_string(),
- "FeatureUnsupported => LZ4 compression is not supported currently",
- );
-
- assert_eq!(
- compression_codec
- .decompress(bytes_vec.clone())
- .unwrap_err()
- .to_string(),
- "FeatureUnsupported => LZ4 decompression is not supported
currently",
- )
+ let compression_codecs = [CompressionCodec::Zstd,
CompressionCodec::Gzip];
+
+ for codec in compression_codecs {
+ let compressed = codec.compress(bytes_vec.clone()).unwrap();
+ assert!(compressed.len() < bytes_vec.len());
+ let decompressed = codec.decompress(compressed).unwrap();
+ assert_eq!(decompressed, bytes_vec);
+ }
}
#[tokio::test]
- async fn test_compression_codec_zstd() {
- let compression_codec = CompressionCodec::Zstd;
+ async fn test_compression_codec_unsupported() {
+ let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")];
let bytes_vec = [0_u8; 100].to_vec();
- let compressed =
compression_codec.compress(bytes_vec.clone()).unwrap();
- assert!(compressed.len() < bytes_vec.len());
+ for (codec, name) in unsupported_codecs {
+ assert_eq!(
+ codec.compress(bytes_vec.clone()).unwrap_err().to_string(),
+ format!("FeatureUnsupported => {name} compression is not
supported currently"),
+ );
- let decompressed =
compression_codec.decompress(compressed.clone()).unwrap();
- assert_eq!(decompressed, bytes_vec)
+ assert_eq!(
+ codec.decompress(bytes_vec.clone()).unwrap_err().to_string(),
+ format!("FeatureUnsupported => {name} decompression is not
supported currently"),
+ );
+ }
}
}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 8d8f40f72..8b345deb6 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -77,6 +77,7 @@ pub mod table;
mod avro;
pub mod cache;
+pub mod compression;
pub mod io;
pub mod spec;
diff --git a/crates/iceberg/src/puffin/metadata.rs
b/crates/iceberg/src/puffin/metadata.rs
index 15a8e9b33..b09f2c7c1 100644
--- a/crates/iceberg/src/puffin/metadata.rs
+++ b/crates/iceberg/src/puffin/metadata.rs
@@ -20,8 +20,8 @@ use std::collections::{HashMap, HashSet};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
+use crate::compression::CompressionCodec;
use crate::io::{FileRead, InputFile};
-use crate::puffin::compression::CompressionCodec;
use crate::{Error, ErrorKind, Result};
/// Human-readable identification of the application writing the file, along
with its version.
@@ -954,4 +954,34 @@ mod tests {
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
}
+
+ #[tokio::test]
+ async fn test_gzip_compression_allowed_in_metadata() {
+ let temp_dir = TempDir::new().unwrap();
+
+ // Create a JSON payload with Gzip compression codec
+ // Metadata should be readable, but accessing the blob will fail
+ let payload = r#"{
+ "blobs": [
+ {
+ "type": "test-type",
+ "fields": [1],
+ "snapshot-id": 1,
+ "sequence-number": 1,
+ "offset": 4,
+ "length": 10,
+ "compression-codec": "gzip"
+ }
+ ]
+ }"#;
+
+ let input_file = input_file_with_payload(&temp_dir, payload).await;
+
+ // Reading metadata should succeed (lazy validation)
+ let result = FileMetadata::read(&input_file).await;
+ assert!(result.is_ok());
+ let metadata = result.unwrap();
+ assert_eq!(metadata.blobs.len(), 1);
+ assert_eq!(metadata.blobs[0].compression_codec,
CompressionCodec::Gzip);
+ }
}
diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs
index 0a0378165..854d4070f 100644
--- a/crates/iceberg/src/puffin/mod.rs
+++ b/crates/iceberg/src/puffin/mod.rs
@@ -19,11 +19,38 @@
#![deny(missing_docs)]
+use crate::{Error, ErrorKind, Result};
+
mod blob;
pub use blob::{APACHE_DATASKETCHES_THETA_V1, Blob, DELETION_VECTOR_V1};
-mod compression;
-pub use compression::CompressionCodec;
+pub use crate::compression::CompressionCodec;
+
+/// Compression codecs supported by the Puffin spec.
+const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[
+ CompressionCodec::None,
+ CompressionCodec::Lz4,
+ CompressionCodec::Zstd,
+];
+
+/// Validates that the compression codec is supported for Puffin files.
+/// Returns an error if the codec is not supported.
+fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> {
+ if !SUPPORTED_PUFFIN_CODECS.contains(&codec) {
+ let supported_names: Vec<String> = SUPPORTED_PUFFIN_CODECS
+ .iter()
+ .map(|c| format!("{c:?}"))
+ .collect();
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Compression codec {codec:?} is not supported for Puffin
files. Only {} are supported.",
+ supported_names.join(", ")
+ ),
+ ));
+ }
+ Ok(())
+}
mod metadata;
pub use metadata::{BlobMetadata, CREATED_BY_PROPERTY, FileMetadata};
@@ -36,3 +63,19 @@ pub use writer::PuffinWriter;
#[cfg(test)]
mod test_utils;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_puffin_codec_validation() {
+ // All codecs in SUPPORTED_PUFFIN_CODECS should be valid
+ for codec in SUPPORTED_PUFFIN_CODECS {
+ assert!(validate_puffin_compression(*codec).is_ok());
+ }
+
+ // Gzip should not be supported for Puffin files
+ assert!(validate_puffin_compression(CompressionCodec::Gzip).is_err());
+ }
+}
diff --git a/crates/iceberg/src/puffin/reader.rs
b/crates/iceberg/src/puffin/reader.rs
index dce53d93f..a6308dceb 100644
--- a/crates/iceberg/src/puffin/reader.rs
+++ b/crates/iceberg/src/puffin/reader.rs
@@ -17,6 +17,7 @@
use tokio::sync::OnceCell;
+use super::validate_puffin_compression;
use crate::Result;
use crate::io::{FileRead, InputFile};
use crate::puffin::blob::Blob;
@@ -46,11 +47,13 @@ impl PuffinReader {
/// Returns blob
pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
+ validate_puffin_compression(blob_metadata.compression_codec)?;
+
let file_read = self.input_file.reader().await?;
let start = blob_metadata.offset;
let end = start + blob_metadata.length;
- let bytes = file_read.read(start..end).await?.to_vec();
- let data = blob_metadata.compression_codec.decompress(bytes)?;
+ let bytes = file_read.read(start..end).await?;
+ let data = blob_metadata.compression_codec.decompress(bytes.to_vec())?;
Ok(Blob {
r#type: blob_metadata.r#type.clone(),
@@ -65,7 +68,11 @@ impl PuffinReader {
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
+ use crate::ErrorKind;
+ use crate::compression::CompressionCodec;
+ use crate::puffin::metadata::BlobMetadata;
use crate::puffin::reader::PuffinReader;
use crate::puffin::test_utils::{
blob_0, blob_1, java_uncompressed_metric_input_file,
@@ -122,4 +129,34 @@ mod tests {
blob_1(),
)
}
+
+ #[tokio::test]
+ async fn test_gzip_compression_rejected_on_blob_access() {
+ // Use a real puffin file
+ let input_file = java_uncompressed_metric_input_file();
+ let reader = PuffinReader::new(input_file);
+
+ // Create a BlobMetadata with Gzip compression
+ let gzip_blob_metadata = BlobMetadata {
+ r#type: "test-type".to_string(),
+ fields: vec![1],
+ snapshot_id: 1,
+ sequence_number: 1,
+ offset: 4,
+ length: 10,
+ compression_codec: CompressionCodec::Gzip,
+ properties: HashMap::new(),
+ };
+
+ // Attempting to access the blob should fail
+ let result = reader.blob(&gzip_blob_metadata).await;
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert!(err.to_string().contains("Gzip"));
+ assert!(
+ err.to_string()
+ .contains("is not supported for Puffin files")
+ );
+ }
}
diff --git a/crates/iceberg/src/puffin/test_utils.rs
b/crates/iceberg/src/puffin/test_utils.rs
index ca91fa217..a3232f53e 100644
--- a/crates/iceberg/src/puffin/test_utils.rs
+++ b/crates/iceberg/src/puffin/test_utils.rs
@@ -18,8 +18,8 @@
use std::collections::HashMap;
use super::blob::Blob;
+use crate::compression::CompressionCodec;
use crate::io::{FileIOBuilder, InputFile};
-use crate::puffin::compression::CompressionCodec;
use crate::puffin::metadata::{BlobMetadata, CREATED_BY_PROPERTY, FileMetadata};
const JAVA_TESTDATA: &str = "testdata/puffin/java-generated";
diff --git a/crates/iceberg/src/puffin/writer.rs
b/crates/iceberg/src/puffin/writer.rs
index f68efda88..c8e4233a9 100644
--- a/crates/iceberg/src/puffin/writer.rs
+++ b/crates/iceberg/src/puffin/writer.rs
@@ -19,10 +19,11 @@ use std::collections::{HashMap, HashSet};
use bytes::Bytes;
+use super::validate_puffin_compression;
use crate::Result;
+use crate::compression::CompressionCodec;
use crate::io::{FileWrite, OutputFile};
use crate::puffin::blob::Blob;
-use crate::puffin::compression::CompressionCodec;
use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
/// Puffin writer
@@ -64,6 +65,8 @@ impl PuffinWriter {
/// Adds blob to Puffin file
pub async fn add(&mut self, blob: Blob, compression_codec:
CompressionCodec) -> Result<()> {
+ validate_puffin_compression(compression_codec)?;
+
self.write_header_once().await?;
let offset = self.num_bytes_written;
@@ -114,8 +117,7 @@ impl PuffinWriter {
properties: self.properties.clone(),
};
let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
- let bytes = json.as_bytes();
- self.footer_compression_codec.compress(bytes.to_vec())
+ self.footer_compression_codec.compress(json.into_bytes())
}
fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as
usize] {
@@ -148,10 +150,9 @@ mod tests {
use tempfile::TempDir;
- use crate::Result;
+ use crate::compression::CompressionCodec;
use crate::io::{FileIOBuilder, InputFile, OutputFile};
use crate::puffin::blob::Blob;
- use crate::puffin::compression::CompressionCodec;
use crate::puffin::metadata::FileMetadata;
use crate::puffin::reader::PuffinReader;
use crate::puffin::test_utils::{
@@ -161,6 +162,7 @@ mod tests {
zstd_compressed_metric_file_metadata,
};
use crate::puffin::writer::PuffinWriter;
+ use crate::{ErrorKind, Result};
async fn write_puffin_file(
temp_dir: &TempDir,
@@ -331,4 +333,22 @@ mod tests {
)
.await
}
+
+ #[tokio::test]
+ async fn test_gzip_compression_rejected() {
+ let temp_dir = TempDir::new().unwrap();
+ let blobs = vec![blob_0()];
+ let blobs_with_compression = blobs_with_compression(blobs,
CompressionCodec::Gzip);
+
+ let result = write_puffin_file(&temp_dir, blobs_with_compression,
file_properties()).await;
+
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert!(err.to_string().contains("Gzip"));
+ assert!(
+ err.to_string()
+ .contains("is not supported for Puffin files")
+ );
+ }
}
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index d3836b2f4..28f753e9f 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -22,12 +22,10 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
-use std::io::Read as _;
use std::sync::Arc;
use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
-use flate2::read::GzDecoder;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
@@ -39,6 +37,7 @@ use super::{
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile,
StructType,
TableProperties,
};
+use crate::compression::CompressionCodec;
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
use crate::spec::EncryptedKey;
@@ -445,16 +444,16 @@ impl TableMetadata {
&& metadata_content[0] == 0x1F
&& metadata_content[1] == 0x8B
{
- let mut decoder = GzDecoder::new(metadata_content.as_ref());
- let mut decompressed_data = Vec::new();
- decoder.read_to_end(&mut decompressed_data).map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- "Trying to read compressed metadata file",
- )
- .with_context("file_path", metadata_location)
- .with_source(e)
- })?;
+ let decompressed_data = CompressionCodec::Gzip
+ .decompress(metadata_content.to_vec())
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Trying to read compressed metadata file",
+ )
+ .with_context("file_path", metadata_location)
+ .with_source(e)
+ })?;
serde_json::from_slice(&decompressed_data)?
} else {
serde_json::from_slice(&metadata_content)?
@@ -1559,7 +1558,6 @@ impl SnapshotLog {
mod tests {
use std::collections::HashMap;
use std::fs;
- use std::io::Write as _;
use std::sync::Arc;
use anyhow::Result;
@@ -1569,6 +1567,7 @@ mod tests {
use uuid::Uuid;
use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
+ use crate::compression::CompressionCodec;
use crate::io::FileIOBuilder;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
@@ -3576,10 +3575,10 @@ mod tests {
let original_metadata: TableMetadata =
get_test_table_metadata("TableMetadataV2Valid.json");
let json = serde_json::to_string(&original_metadata).unwrap();
- let mut encoder = flate2::write::GzEncoder::new(Vec::new(),
flate2::Compression::default());
- encoder.write_all(json.as_bytes()).unwrap();
- std::fs::write(&metadata_location, encoder.finish().unwrap())
- .expect("failed to write metadata");
+ let compressed = CompressionCodec::Gzip
+ .compress(json.into_bytes())
+ .expect("failed to compress metadata");
+ std::fs::write(&metadata_location, &compressed).expect("failed to
write metadata");
// Read the metadata back
let file_io = FileIOBuilder::new_fs_io().build().unwrap();