This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7addc3f5 feat(puffin): Parse Puffin FileMetadata (#765)
7addc3f5 is described below
commit 7addc3f54bccf24a68297f9c8980086e4f63f2d0
Author: Farooq Qaiser <[email protected]>
AuthorDate: Mon Jan 13 21:14:58 2025 -0500
feat(puffin): Parse Puffin FileMetadata (#765)
* Add Puffin FileMetadata
* Fix comment locations
* Put Ok(()) branch first
* Use map_err
* Inline err_out_of_bounds function
* Use ok_or_else
* Remove #[rustfmt::skip]
* Rename input_fields to fields
* Simplify flag parsing
* Remove unnecessary reference
* Make BlobMetadata.length a u64 (instead of usize)
* Replace from with as
---
crates/iceberg/src/puffin/compression.rs | 5 +-
crates/iceberg/src/puffin/metadata.rs | 777 +++++++++++++++++++++
crates/iceberg/src/puffin/mod.rs | 4 +
crates/iceberg/src/puffin/test_utils.rs | 158 +++++
.../java-generated/empty-puffin-uncompressed.bin | Bin 0 -> 32 bytes
.../sample-metric-data-compressed-zstd.bin | Bin 0 -> 417 bytes
.../sample-metric-data-uncompressed.bin | Bin 0 -> 355 bytes
7 files changed, 943 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/puffin/compression.rs
b/crates/iceberg/src/puffin/compression.rs
index 710698df..a9a56ef1 100644
--- a/crates/iceberg/src/puffin/compression.rs
+++ b/crates/iceberg/src/puffin/compression.rs
@@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use serde::{Deserialize, Serialize};
+
use crate::{Error, ErrorKind, Result};
-#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
/// Data compression formats
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
pub enum CompressionCodec {
#[default]
/// No compression
diff --git a/crates/iceberg/src/puffin/metadata.rs
b/crates/iceberg/src/puffin/metadata.rs
new file mode 100644
index 00000000..9d000322
--- /dev/null
+++ b/crates/iceberg/src/puffin/metadata.rs
@@ -0,0 +1,777 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see:
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {
+ /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
+ pub(crate) r#type: String,
+ /// List of field IDs the blob was computed for; the order of items is
used to compute sketches stored in the blob.
+ pub(crate) fields: Vec<i32>,
+ /// ID of the Iceberg table's snapshot the blob was computed from
+ pub(crate) snapshot_id: i64,
+ /// Sequence number of the Iceberg table's snapshot the blob was computed
from
+ pub(crate) sequence_number: i64,
+ /// The offset in the file where the blob contents start
+ pub(crate) offset: u64,
+ /// The length of the blob stored in the file (after compression, if
compressed)
+ pub(crate) length: u64,
+ /// The compression codec used to compress the data
+ #[serde(skip_serializing_if = "CompressionCodec::is_none")]
+ #[serde(default)]
+ pub(crate) compression_codec: CompressionCodec,
+ /// Arbitrary meta-information about the blob
+ #[serde(skip_serializing_if = "HashMap::is_empty")]
+ #[serde(default)]
+ pub(crate) properties: HashMap<String, String>,
+}
+
+#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
+pub(crate) enum Flag {
+ FooterPayloadCompressed = 0,
+}
+
+impl Flag {
+ pub(crate) fn byte_idx(self) -> u8 {
+ (self as u8) / 8
+ }
+
+ pub(crate) fn bit_idx(self) -> u8 {
+ (self as u8) % 8
+ }
+
+ fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
+ self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
+ }
+
+ fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
+ if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
+ Ok(Flag::FooterPayloadCompressed)
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Unknown flag byte {} and bit {} combination",
+ byte_idx, bit_idx
+ ),
+ ))
+ }
+ }
+}
+
+/// Metadata about a puffin file.
+/// For more information, see:
https://iceberg.apache.org/puffin-spec/#filemetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+pub(crate) struct FileMetadata {
+ /// Metadata about blobs in file
+ pub(crate) blobs: Vec<BlobMetadata>,
+ /// Arbitrary meta-information, like writer identification/version.
+ #[serde(skip_serializing_if = "HashMap::is_empty")]
+ #[serde(default)]
+ pub(crate) properties: HashMap<String, String>,
+}
+
+impl FileMetadata {
+ pub(crate) const MAGIC_LENGTH: u8 = 4;
+ pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50,
0x46, 0x41, 0x31];
+
+ // We use the term FOOTER_STRUCT to refer to the fixed-length portion of
the Footer, as illustrated below.
+ //
+ // Footer
+ // |
+ // -------------------------------------------------
+ // | |
+ // Magic FooterPayload FooterPayloadLength Flags Magic
+ // | |
+ // -----------------------------
+ // |
+ // FOOTER_STRUCT
+
+ const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
+ const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
+ const FOOTER_STRUCT_FLAGS_OFFSET: u8 =
FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
+ + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
+ pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
+ const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
+ FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET +
FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
+ pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
+ FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
+
+ fn check_magic(bytes: &[u8]) -> Result<()> {
+ if bytes == FileMetadata::MAGIC {
+ Ok(())
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Bad magic value: {:?} should be {:?}",
+ bytes,
+ FileMetadata::MAGIC
+ ),
+ ))
+ }
+ }
+
+ async fn read_footer_payload_length(
+ file_read: &dyn FileRead,
+ input_file_length: u64,
+ ) -> Result<u32> {
+ let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as
u64;
+ let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as
u64;
+ let footer_payload_length_bytes = file_read.read(start..end).await?;
+ let mut buf = [0; 4];
+ buf.copy_from_slice(&footer_payload_length_bytes);
+ let footer_payload_length = u32::from_le_bytes(buf);
+ Ok(footer_payload_length)
+ }
+
+ async fn read_footer_bytes(
+ file_read: &dyn FileRead,
+ input_file_length: u64,
+ footer_payload_length: u32,
+ ) -> Result<Bytes> {
+ let footer_length = footer_payload_length as u64
+ + FileMetadata::FOOTER_STRUCT_LENGTH as u64
+ + FileMetadata::MAGIC_LENGTH as u64;
+ let start = input_file_length - footer_length;
+ let end = input_file_length;
+ file_read.read(start..end).await
+ }
+
+ fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
+ let mut flags = HashSet::new();
+
+ for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
+ let byte_offset = footer_bytes.len()
+ - FileMetadata::MAGIC_LENGTH as usize
+ - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
+ + byte_idx as usize;
+
+ let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
+ Error::new(ErrorKind::DataInvalid, "Index range is out of
bounds.")
+ })?;
+
+ for bit_idx in 0..8 {
+ if ((flag_byte >> bit_idx) & 1) != 0 {
+ let flag = Flag::from(byte_idx, bit_idx)?;
+ flags.insert(flag);
+ }
+ }
+ }
+
+ Ok(flags)
+ }
+
+ fn extract_footer_payload_as_str(
+ footer_bytes: &[u8],
+ footer_payload_length: u32,
+ ) -> Result<String> {
+ let flags = FileMetadata::decode_flags(footer_bytes)?;
+ let footer_compression_codec = if
flags.contains(&Flag::FooterPayloadCompressed) {
+ CompressionCodec::Lz4
+ } else {
+ CompressionCodec::None
+ };
+
+ let start_offset = FileMetadata::MAGIC_LENGTH as usize;
+ let end_offset =
+ FileMetadata::MAGIC_LENGTH as usize +
usize::try_from(footer_payload_length)?;
+ let footer_payload_bytes = footer_bytes
+ .get(start_offset..end_offset)
+ .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is
out of bounds."))?;
+ let decompressed_footer_payload_bytes =
+ footer_compression_codec.decompress(footer_payload_bytes.into())?;
+
+ String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
+ Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8
string")
+ .with_source(src)
+ })
+ }
+
+ fn from_json_str(string: &str) -> Result<FileMetadata> {
+ serde_json::from_str::<FileMetadata>(string).map_err(|src| {
+ Error::new(ErrorKind::DataInvalid, "Given string is not valid
JSON").with_source(src)
+ })
+ }
+
+ /// Returns the file metadata about a Puffin file
+ pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
+ let file_read = input_file.reader().await?;
+
+ let first_four_bytes =
file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
+ FileMetadata::check_magic(&first_four_bytes)?;
+
+ let input_file_length = input_file.metadata().await?.size;
+ let footer_payload_length =
+ FileMetadata::read_footer_payload_length(&file_read,
input_file_length).await?;
+ let footer_bytes =
+ FileMetadata::read_footer_bytes(&file_read, input_file_length,
footer_payload_length)
+ .await?;
+
+ let magic_length = FileMetadata::MAGIC_LENGTH as usize;
+ // check first four bytes of footer
+ FileMetadata::check_magic(&footer_bytes[..magic_length])?;
+ // check last four bytes of footer
+ FileMetadata::check_magic(&footer_bytes[footer_bytes.len() -
magic_length..])?;
+
+ let footer_payload_str =
+ FileMetadata::extract_footer_payload_as_str(&footer_bytes,
footer_payload_length)?;
+ FileMetadata::from_json_str(&footer_payload_str)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use bytes::Bytes;
+ use tempfile::TempDir;
+
+ use crate::io::{FileIOBuilder, InputFile};
+ use crate::puffin::metadata::{BlobMetadata, CompressionCodec,
FileMetadata};
+ use crate::puffin::test_utils::{
+ empty_footer_payload, empty_footer_payload_bytes,
empty_footer_payload_bytes_length_bytes,
+ java_empty_uncompressed_input_file,
java_uncompressed_metric_input_file,
+ java_zstd_compressed_metric_input_file,
uncompressed_metric_file_metadata,
+ zstd_compressed_metric_file_metadata,
+ };
+
+ const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
+
+ async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) ->
InputFile {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+
+ let path_buf = temp_dir.path().join("abc.puffin");
+ let temp_path = path_buf.to_str().unwrap();
+ let output_file = file_io.new_output(temp_path).unwrap();
+
+ output_file
+ .write(Bytes::copy_from_slice(slice))
+ .await
+ .unwrap();
+
+ output_file.to_input_file()
+ }
+
+ async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) ->
InputFile {
+ let payload_bytes = payload_str.as_bytes();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(payload_bytes);
+ bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC);
+
+ input_file_with_bytes(temp_dir, &bytes).await
+ }
+
+ #[tokio::test]
+ async fn test_file_starting_with_invalid_magic_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(INVALID_MAGIC_VALUE.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(empty_footer_payload_bytes_length_bytes());
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC);
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80,
70, 65, 49]",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(INVALID_MAGIC_VALUE.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(empty_footer_payload_bytes_length_bytes());
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC);
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80,
70, 65, 49]",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_file_ending_with_invalid_magic_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(empty_footer_payload_bytes_length_bytes());
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(INVALID_MAGIC_VALUE);
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80,
70, 65, 49]",
+ )
+ }
+
+ #[tokio::test]
+ async fn
test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(u32::to_le_bytes(
+ empty_footer_payload_bytes().len() as u32 + 1,
+ ));
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80,
70, 65, 49]",
+ )
+ }
+
+ #[tokio::test]
+ async fn
test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(u32::to_le_bytes(
+ empty_footer_payload_bytes().len() as u32 - 1,
+ ));
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80,
70, 65, 49]",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_lz4_compressed_footer_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(empty_footer_payload_bytes_length_bytes());
+ bytes.extend(vec![0b00000001, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "FeatureUnsupported => LZ4 decompression is not supported
currently",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_unknown_byte_bit_combination_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(empty_footer_payload_bytes_length_bytes());
+ bytes.extend(vec![0b00000010, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file)
+ .await
+ .unwrap_err()
+ .to_string(),
+ "DataInvalid => Unknown flag byte 0 and bit 1 combination",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_non_utf8_string_payload_returns_error() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let payload_bytes: [u8; 4] = [0, 159, 146, 150];
+ let payload_bytes_length_bytes: [u8; 4] =
u32::to_le_bytes(payload_bytes.len() as u32);
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(payload_bytes);
+ bytes.extend(payload_bytes_length_bytes);
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+ "DataInvalid => Footer is not a valid UTF-8 string, source:
invalid utf-8 sequence of 1 bytes from index 1",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_minimal_valid_file_returns_file_metadata() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let mut bytes = vec![];
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(FileMetadata::MAGIC.to_vec());
+ bytes.extend(empty_footer_payload_bytes());
+ bytes.extend(empty_footer_payload_bytes_length_bytes());
+ bytes.extend(vec![0, 0, 0, 0]);
+ bytes.extend(FileMetadata::MAGIC);
+
+ let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap(),
+ FileMetadata {
+ blobs: vec![],
+ properties: HashMap::new(),
+ }
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_file_metadata_property() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ r#"{
+ "blobs" : [ ],
+ "properties" : {
+ "a property" : "a property value"
+ }
+ }"#,
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap(),
+ FileMetadata {
+ blobs: vec![],
+ properties: {
+ let mut map = HashMap::new();
+ map.insert("a property".to_string(), "a property
value".to_string());
+ map
+ },
+ }
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_file_metadata_properties() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ r#"{
+ "blobs" : [ ],
+ "properties" : {
+ "a property" : "a property value",
+ "another one": "also with value"
+ }
+ }"#,
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap(),
+ FileMetadata {
+ blobs: vec![],
+ properties: {
+ let mut map = HashMap::new();
+ map.insert("a property".to_string(), "a property
value".to_string());
+ map.insert("another one".to_string(), "also with
value".to_string());
+ map
+ },
+ }
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_error_if_blobs_field_is_missing() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ r#"{
+ "properties" : {}
+ }"#,
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+ format!(
+ "DataInvalid => Given string is not valid JSON, source:
missing field `blobs` at line 3 column 13"
+ ),
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_error_if_blobs_field_is_bad() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ r#"{
+ "blobs" : {}
+ }"#,
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+ format!("DataInvalid => Given string is not valid JSON, source:
invalid type: map, expected a sequence at line 2 column 26"),
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_blobs_metadatas() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ r#"{
+ "blobs" : [
+ {
+ "type" : "type-a",
+ "fields" : [ 1 ],
+ "snapshot-id" : 14,
+ "sequence-number" : 3,
+ "offset" : 4,
+ "length" : 16
+ },
+ {
+ "type" : "type-bbb",
+ "fields" : [ 2, 3, 4 ],
+ "snapshot-id" : 77,
+ "sequence-number" : 4,
+ "offset" : 21474836470000,
+ "length" : 79834
+ }
+ ]
+ }"#,
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap(),
+ FileMetadata {
+ blobs: vec![
+ BlobMetadata {
+ r#type: "type-a".to_string(),
+ fields: vec![1],
+ snapshot_id: 14,
+ sequence_number: 3,
+ offset: 4,
+ length: 16,
+ compression_codec: CompressionCodec::None,
+ properties: HashMap::new(),
+ },
+ BlobMetadata {
+ r#type: "type-bbb".to_string(),
+ fields: vec![2, 3, 4],
+ snapshot_id: 77,
+ sequence_number: 4,
+ offset: 21474836470000,
+ length: 79834,
+ compression_codec: CompressionCodec::None,
+ properties: HashMap::new(),
+ },
+ ],
+ properties: HashMap::new(),
+ }
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_properties_in_blob_metadata() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ r#"{
+ "blobs" : [
+ {
+ "type" : "type-a",
+ "fields" : [ 1 ],
+ "snapshot-id" : 14,
+ "sequence-number" : 3,
+ "offset" : 4,
+ "length" : 16,
+ "properties" : {
+ "some key" : "some value"
+ }
+ }
+ ]
+ }"#,
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap(),
+ FileMetadata {
+ blobs: vec![BlobMetadata {
+ r#type: "type-a".to_string(),
+ fields: vec![1],
+ snapshot_id: 14,
+ sequence_number: 3,
+ offset: 4,
+ length: 16,
+ compression_codec: CompressionCodec::None,
+ properties: {
+ let mut map = HashMap::new();
+ map.insert("some key".to_string(), "some
value".to_string());
+ map
+ },
+ }],
+ properties: HashMap::new(),
+ }
+ )
+ }
+
+ #[tokio::test]
+ async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
+
+ let input_file = input_file_with_payload(
+ &temp_dir,
+ &format!(
+ r#"{{
+ "blobs" : [
+ {{
+ "type" : "type-a",
+ "fields" : [ {} ],
+ "snapshot-id" : 14,
+ "sequence-number" : 3,
+ "offset" : 4,
+ "length" : 16
+ }}
+ ]
+ }}"#,
+ out_of_i32_range_number
+ ),
+ )
+ .await;
+
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+ format!(
+ "DataInvalid => Given string is not valid JSON, source:
invalid value: integer `{}`, expected i32 at line 5 column 51",
+ out_of_i32_range_number
+ ),
+ )
+ }
+
+ #[tokio::test]
+ async fn
test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
+ let temp_dir = TempDir::new().unwrap();
+
+ let input_file = input_file_with_payload(&temp_dir, r#""blobs" =
[]"#).await;
+ assert_eq!(
+ FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+ "DataInvalid => Given string is not valid JSON, source: invalid
type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
+ )
+ }
+
+ #[tokio::test]
+ async fn test_read_file_metadata_of_uncompressed_empty_file() {
+ let input_file = java_empty_uncompressed_input_file();
+ let file_metadata = FileMetadata::read(&input_file).await.unwrap();
+ assert_eq!(file_metadata, empty_footer_payload())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_metadata_of_uncompressed_metric_data() {
+ let input_file = java_uncompressed_metric_input_file();
+ let file_metadata = FileMetadata::read(&input_file).await.unwrap();
+ assert_eq!(file_metadata, uncompressed_metric_file_metadata())
+ }
+
+ #[tokio::test]
+ async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
+ let input_file = java_zstd_compressed_metric_input_file();
+ let file_metadata = FileMetadata::read(&input_file).await.unwrap();
+ assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
+ }
+}
diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs
index c13ebe42..91bdf125 100644
--- a/crates/iceberg/src/puffin/mod.rs
+++ b/crates/iceberg/src/puffin/mod.rs
@@ -22,3 +22,7 @@
#![allow(dead_code)]
mod compression;
+mod metadata;
+
+#[cfg(test)]
+mod test_utils;
diff --git a/crates/iceberg/src/puffin/test_utils.rs
b/crates/iceberg/src/puffin/test_utils.rs
new file mode 100644
index 00000000..e49e51d5
--- /dev/null
+++ b/crates/iceberg/src/puffin/test_utils.rs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use crate::io::{FileIOBuilder, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};
+
+const JAVA_TESTDATA: &str = "testdata/puffin/java-generated";
+const EMPTY_UNCOMPRESSED: &str = "empty-puffin-uncompressed.bin";
+const METRIC_UNCOMPRESSED: &str = "sample-metric-data-uncompressed.bin";
+const METRIC_ZSTD_COMPRESSED: &str = "sample-metric-data-compressed-zstd.bin";
+
+fn input_file_for_test_data(path: &str) -> InputFile {
+ FileIOBuilder::new_fs_io()
+ .build()
+ .unwrap()
+ .new_input(env!("CARGO_MANIFEST_DIR").to_owned() + "/" + path)
+ .unwrap()
+}
+
+pub(crate) fn java_empty_uncompressed_input_file() -> InputFile {
+ input_file_for_test_data(&[JAVA_TESTDATA, EMPTY_UNCOMPRESSED].join("/"))
+}
+
+pub(crate) fn java_uncompressed_metric_input_file() -> InputFile {
+ input_file_for_test_data(&[JAVA_TESTDATA, METRIC_UNCOMPRESSED].join("/"))
+}
+
+pub(crate) fn java_zstd_compressed_metric_input_file() -> InputFile {
+ input_file_for_test_data(&[JAVA_TESTDATA,
METRIC_ZSTD_COMPRESSED].join("/"))
+}
+
+pub(crate) fn empty_footer_payload() -> FileMetadata {
+ FileMetadata {
+ blobs: Vec::new(),
+ properties: HashMap::new(),
+ }
+}
+
+pub(crate) fn empty_footer_payload_bytes() -> Vec<u8> {
+ return serde_json::to_string::<FileMetadata>(&empty_footer_payload())
+ .unwrap()
+ .as_bytes()
+ .to_vec();
+}
+
+pub(crate) fn empty_footer_payload_bytes_length_bytes() -> [u8; 4] {
+ u32::to_le_bytes(empty_footer_payload_bytes().len() as u32)
+}
+
+pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob";
+pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1];
+pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2;
+pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1;
+
+pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
+ BlobMetadata {
+ r#type: METRIC_BLOB_0_TYPE.to_string(),
+ fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
+ snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
+ sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
+ offset: 4,
+ length: 22,
+ compression_codec: CompressionCodec::Zstd,
+ properties: HashMap::new(),
+ }
+}
+
+pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata {
+ BlobMetadata {
+ r#type: METRIC_BLOB_0_TYPE.to_string(),
+ fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
+ snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
+ sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
+ offset: 4,
+ length: 9,
+ compression_codec: CompressionCodec::None,
+ properties: HashMap::new(),
+ }
+}
+
+pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob";
+pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2];
+pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2;
+pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1;
+
+pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata {
+ BlobMetadata {
+ r#type: METRIC_BLOB_1_TYPE.to_string(),
+ fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
+ snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
+ sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
+ offset: 13,
+ length: 83,
+ compression_codec: CompressionCodec::None,
+ properties: HashMap::new(),
+ }
+}
+
+pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
+ BlobMetadata {
+ r#type: METRIC_BLOB_1_TYPE.to_string(),
+ fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
+ snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
+ sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
+ offset: 26,
+ length: 77,
+ compression_codec: CompressionCodec::Zstd,
+ properties: HashMap::new(),
+ }
+}
+
+pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234";
+
+pub(crate) fn file_properties() -> HashMap<String, String> {
+ let mut properties = HashMap::new();
+ properties.insert(
+ CREATED_BY_PROPERTY.to_string(),
+ CREATED_BY_PROPERTY_VALUE.to_string(),
+ );
+ properties
+}
+
+pub(crate) fn uncompressed_metric_file_metadata() -> FileMetadata {
+ FileMetadata {
+ blobs: vec![
+ uncompressed_metric_blob_0_metadata(),
+ uncompressed_metric_blob_1_metadata(),
+ ],
+ properties: file_properties(),
+ }
+}
+
+pub(crate) fn zstd_compressed_metric_file_metadata() -> FileMetadata {
+ FileMetadata {
+ blobs: vec![
+ zstd_compressed_metric_blob_0_metadata(),
+ zstd_compressed_metric_blob_1_metadata(),
+ ],
+ properties: file_properties(),
+ }
+}
diff --git
a/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin
b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin
new file mode 100644
index 00000000..142b45bd
Binary files /dev/null and
b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin
differ
diff --git
a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin
new file mode 100644
index 00000000..ac8b69c7
Binary files /dev/null and
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin
differ
diff --git
a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin
new file mode 100644
index 00000000..ab8da138
Binary files /dev/null and
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin
differ