This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7addc3f5 feat(puffin): Parse Puffin FileMetadata (#765)
7addc3f5 is described below

commit 7addc3f54bccf24a68297f9c8980086e4f63f2d0
Author: Farooq Qaiser <[email protected]>
AuthorDate: Mon Jan 13 21:14:58 2025 -0500

    feat(puffin): Parse Puffin FileMetadata (#765)
    
    * Add Puffin FileMetadata
    
    * Fix comment locations
    
    * Put Ok(()) branch first
    
    * Use map_err
    
    * Inline err_out_of_bounds function
    
    * Use ok_or_else
    
    * Remove #[rustfmt::skip]
    
    * Rename input_fields to fields
    
    * Simplify flag parsing
    
    * Remove unnecessary reference
    
    * Make BlobMetadata.length a u64 (instead of usize)
    
    * Replace from with as
---
 crates/iceberg/src/puffin/compression.rs           |   5 +-
 crates/iceberg/src/puffin/metadata.rs              | 777 +++++++++++++++++++++
 crates/iceberg/src/puffin/mod.rs                   |   4 +
 crates/iceberg/src/puffin/test_utils.rs            | 158 +++++
 .../java-generated/empty-puffin-uncompressed.bin   | Bin 0 -> 32 bytes
 .../sample-metric-data-compressed-zstd.bin         | Bin 0 -> 417 bytes
 .../sample-metric-data-uncompressed.bin            | Bin 0 -> 355 bytes
 7 files changed, 943 insertions(+), 1 deletion(-)

diff --git a/crates/iceberg/src/puffin/compression.rs 
b/crates/iceberg/src/puffin/compression.rs
index 710698df..a9a56ef1 100644
--- a/crates/iceberg/src/puffin/compression.rs
+++ b/crates/iceberg/src/puffin/compression.rs
@@ -15,10 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use serde::{Deserialize, Serialize};
+
 use crate::{Error, ErrorKind, Result};
 
-#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
 /// Data compression formats
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
 pub enum CompressionCodec {
     #[default]
     /// No compression
diff --git a/crates/iceberg/src/puffin/metadata.rs 
b/crates/iceberg/src/puffin/metadata.rs
new file mode 100644
index 00000000..9d000322
--- /dev/null
+++ b/crates/iceberg/src/puffin/metadata.rs
@@ -0,0 +1,777 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along 
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {
+    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
+    pub(crate) r#type: String,
+    /// List of field IDs the blob was computed for; the order of items is 
used to compute sketches stored in the blob.
+    pub(crate) fields: Vec<i32>,
+    /// ID of the Iceberg table's snapshot the blob was computed from
+    pub(crate) snapshot_id: i64,
+    /// Sequence number of the Iceberg table's snapshot the blob was computed 
from
+    pub(crate) sequence_number: i64,
+    /// The offset in the file where the blob contents start
+    pub(crate) offset: u64,
+    /// The length of the blob stored in the file (after compression, if 
compressed)
+    pub(crate) length: u64,
+    /// The compression codec used to compress the data
+    #[serde(skip_serializing_if = "CompressionCodec::is_none")]
+    #[serde(default)]
+    pub(crate) compression_codec: CompressionCodec,
+    /// Arbitrary meta-information about the blob
+    #[serde(skip_serializing_if = "HashMap::is_empty")]
+    #[serde(default)]
+    pub(crate) properties: HashMap<String, String>,
+}
+
+#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
+pub(crate) enum Flag {
+    FooterPayloadCompressed = 0,
+}
+
+impl Flag {
+    pub(crate) fn byte_idx(self) -> u8 {
+        (self as u8) / 8
+    }
+
+    pub(crate) fn bit_idx(self) -> u8 {
+        (self as u8) % 8
+    }
+
+    fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
+        self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
+    }
+
+    fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
+        if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
+            Ok(Flag::FooterPayloadCompressed)
+        } else {
+            Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Unknown flag byte {} and bit {} combination",
+                    byte_idx, bit_idx
+                ),
+            ))
+        }
+    }
+}
+
+/// Metadata about a puffin file.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#filemetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+pub(crate) struct FileMetadata {
+    /// Metadata about blobs in file
+    pub(crate) blobs: Vec<BlobMetadata>,
+    /// Arbitrary meta-information, like writer identification/version.
+    #[serde(skip_serializing_if = "HashMap::is_empty")]
+    #[serde(default)]
+    pub(crate) properties: HashMap<String, String>,
+}
+
+impl FileMetadata {
+    pub(crate) const MAGIC_LENGTH: u8 = 4;
+    pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 
0x46, 0x41, 0x31];
+
+    // We use the term FOOTER_STRUCT to refer to the fixed-length portion of 
the Footer, as illustrated below.
+    //
+    //                        Footer
+    //                          |
+    //  -------------------------------------------------
+    // |                                                 |
+    // Magic FooterPayload FooterPayloadLength Flags Magic
+    //                     |                             |
+    //                      -----------------------------
+    //                                    |
+    //                              FOOTER_STRUCT
+
+    const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
+    const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
+    const FOOTER_STRUCT_FLAGS_OFFSET: u8 = 
FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
+        + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
+    pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
+    const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
+        FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + 
FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
+    pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
+        FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
+
+    fn check_magic(bytes: &[u8]) -> Result<()> {
+        if bytes == FileMetadata::MAGIC {
+            Ok(())
+        } else {
+            Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Bad magic value: {:?} should be {:?}",
+                    bytes,
+                    FileMetadata::MAGIC
+                ),
+            ))
+        }
+    }
+
+    async fn read_footer_payload_length(
+        file_read: &dyn FileRead,
+        input_file_length: u64,
+    ) -> Result<u32> {
+        let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as 
u64;
+        let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as 
u64;
+        let footer_payload_length_bytes = file_read.read(start..end).await?;
+        let mut buf = [0; 4];
+        buf.copy_from_slice(&footer_payload_length_bytes);
+        let footer_payload_length = u32::from_le_bytes(buf);
+        Ok(footer_payload_length)
+    }
+
+    async fn read_footer_bytes(
+        file_read: &dyn FileRead,
+        input_file_length: u64,
+        footer_payload_length: u32,
+    ) -> Result<Bytes> {
+        let footer_length = footer_payload_length as u64
+            + FileMetadata::FOOTER_STRUCT_LENGTH as u64
+            + FileMetadata::MAGIC_LENGTH as u64;
+        let start = input_file_length - footer_length;
+        let end = input_file_length;
+        file_read.read(start..end).await
+    }
+
+    fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
+        let mut flags = HashSet::new();
+
+        for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
+            let byte_offset = footer_bytes.len()
+                - FileMetadata::MAGIC_LENGTH as usize
+                - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
+                + byte_idx as usize;
+
+            let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
+                Error::new(ErrorKind::DataInvalid, "Index range is out of 
bounds.")
+            })?;
+
+            for bit_idx in 0..8 {
+                if ((flag_byte >> bit_idx) & 1) != 0 {
+                    let flag = Flag::from(byte_idx, bit_idx)?;
+                    flags.insert(flag);
+                }
+            }
+        }
+
+        Ok(flags)
+    }
+
+    fn extract_footer_payload_as_str(
+        footer_bytes: &[u8],
+        footer_payload_length: u32,
+    ) -> Result<String> {
+        let flags = FileMetadata::decode_flags(footer_bytes)?;
+        let footer_compression_codec = if 
flags.contains(&Flag::FooterPayloadCompressed) {
+            CompressionCodec::Lz4
+        } else {
+            CompressionCodec::None
+        };
+
+        let start_offset = FileMetadata::MAGIC_LENGTH as usize;
+        let end_offset =
+            FileMetadata::MAGIC_LENGTH as usize + 
usize::try_from(footer_payload_length)?;
+        let footer_payload_bytes = footer_bytes
+            .get(start_offset..end_offset)
+            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is 
out of bounds."))?;
+        let decompressed_footer_payload_bytes =
+            footer_compression_codec.decompress(footer_payload_bytes.into())?;
+
+        String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
+            Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 
string")
+                .with_source(src)
+        })
+    }
+
+    fn from_json_str(string: &str) -> Result<FileMetadata> {
+        serde_json::from_str::<FileMetadata>(string).map_err(|src| {
+            Error::new(ErrorKind::DataInvalid, "Given string is not valid 
JSON").with_source(src)
+        })
+    }
+
+    /// Returns the file metadata about a Puffin file
+    pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
+        let file_read = input_file.reader().await?;
+
+        let first_four_bytes = 
file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
+        FileMetadata::check_magic(&first_four_bytes)?;
+
+        let input_file_length = input_file.metadata().await?.size;
+        let footer_payload_length =
+            FileMetadata::read_footer_payload_length(&file_read, 
input_file_length).await?;
+        let footer_bytes =
+            FileMetadata::read_footer_bytes(&file_read, input_file_length, 
footer_payload_length)
+                .await?;
+
+        let magic_length = FileMetadata::MAGIC_LENGTH as usize;
+        // check first four bytes of footer
+        FileMetadata::check_magic(&footer_bytes[..magic_length])?;
+        // check last four bytes of footer
+        FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - 
magic_length..])?;
+
+        let footer_payload_str =
+            FileMetadata::extract_footer_payload_as_str(&footer_bytes, 
footer_payload_length)?;
+        FileMetadata::from_json_str(&footer_payload_str)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use bytes::Bytes;
+    use tempfile::TempDir;
+
+    use crate::io::{FileIOBuilder, InputFile};
+    use crate::puffin::metadata::{BlobMetadata, CompressionCodec, 
FileMetadata};
+    use crate::puffin::test_utils::{
+        empty_footer_payload, empty_footer_payload_bytes, 
empty_footer_payload_bytes_length_bytes,
+        java_empty_uncompressed_input_file, 
java_uncompressed_metric_input_file,
+        java_zstd_compressed_metric_input_file, 
uncompressed_metric_file_metadata,
+        zstd_compressed_metric_file_metadata,
+    };
+
+    const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
+
+    async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> 
InputFile {
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+
+        let path_buf = temp_dir.path().join("abc.puffin");
+        let temp_path = path_buf.to_str().unwrap();
+        let output_file = file_io.new_output(temp_path).unwrap();
+
+        output_file
+            .write(Bytes::copy_from_slice(slice))
+            .await
+            .unwrap();
+
+        output_file.to_input_file()
+    }
+
+    async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> 
InputFile {
+        let payload_bytes = payload_str.as_bytes();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(payload_bytes);
+        bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC);
+
+        input_file_with_bytes(temp_dir, &bytes).await
+    }
+
+    #[tokio::test]
+    async fn test_file_starting_with_invalid_magic_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(empty_footer_payload_bytes_length_bytes());
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC);
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 
70, 65, 49]",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(empty_footer_payload_bytes_length_bytes());
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC);
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 
70, 65, 49]",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_file_ending_with_invalid_magic_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(empty_footer_payload_bytes_length_bytes());
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(INVALID_MAGIC_VALUE);
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 
70, 65, 49]",
+        )
+    }
+
+    #[tokio::test]
+    async fn 
test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(u32::to_le_bytes(
+            empty_footer_payload_bytes().len() as u32 + 1,
+        ));
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 
70, 65, 49]",
+        )
+    }
+
+    #[tokio::test]
+    async fn 
test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(u32::to_le_bytes(
+            empty_footer_payload_bytes().len() as u32 - 1,
+        ));
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 
70, 65, 49]",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_lz4_compressed_footer_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(empty_footer_payload_bytes_length_bytes());
+        bytes.extend(vec![0b00000001, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "FeatureUnsupported => LZ4 decompression is not supported 
currently",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_unknown_byte_bit_combination_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(empty_footer_payload_bytes_length_bytes());
+        bytes.extend(vec![0b00000010, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file)
+                .await
+                .unwrap_err()
+                .to_string(),
+            "DataInvalid => Unknown flag byte 0 and bit 1 combination",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_non_utf8_string_payload_returns_error() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let payload_bytes: [u8; 4] = [0, 159, 146, 150];
+        let payload_bytes_length_bytes: [u8; 4] = 
u32::to_le_bytes(payload_bytes.len() as u32);
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(payload_bytes);
+        bytes.extend(payload_bytes_length_bytes);
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+            "DataInvalid => Footer is not a valid UTF-8 string, source: 
invalid utf-8 sequence of 1 bytes from index 1",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_minimal_valid_file_returns_file_metadata() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let mut bytes = vec![];
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(FileMetadata::MAGIC.to_vec());
+        bytes.extend(empty_footer_payload_bytes());
+        bytes.extend(empty_footer_payload_bytes_length_bytes());
+        bytes.extend(vec![0, 0, 0, 0]);
+        bytes.extend(FileMetadata::MAGIC);
+
+        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap(),
+            FileMetadata {
+                blobs: vec![],
+                properties: HashMap::new(),
+            }
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_file_metadata_property() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            r#"{
+                "blobs" : [ ],
+                "properties" : {
+                    "a property" : "a property value"
+                }
+            }"#,
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap(),
+            FileMetadata {
+                blobs: vec![],
+                properties: {
+                    let mut map = HashMap::new();
+                    map.insert("a property".to_string(), "a property 
value".to_string());
+                    map
+                },
+            }
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_file_metadata_properties() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            r#"{
+                "blobs" : [ ],
+                "properties" : {
+                    "a property" : "a property value",
+                    "another one": "also with value"
+                }
+            }"#,
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap(),
+            FileMetadata {
+                blobs: vec![],
+                properties: {
+                    let mut map = HashMap::new();
+                    map.insert("a property".to_string(), "a property 
value".to_string());
+                    map.insert("another one".to_string(), "also with 
value".to_string());
+                    map
+                },
+            }
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_error_if_blobs_field_is_missing() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            r#"{
+                "properties" : {}
+            }"#,
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+            format!(
+                "DataInvalid => Given string is not valid JSON, source: 
missing field `blobs` at line 3 column 13"
+            ),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_error_if_blobs_field_is_bad() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            r#"{
+                "blobs" : {}
+            }"#,
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+            format!("DataInvalid => Given string is not valid JSON, source: 
invalid type: map, expected a sequence at line 2 column 26"),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_blobs_metadatas() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            r#"{
+                "blobs" : [
+                    {
+                        "type" : "type-a",
+                        "fields" : [ 1 ],
+                        "snapshot-id" : 14,
+                        "sequence-number" : 3,
+                        "offset" : 4,
+                        "length" : 16
+                    },
+                    {
+                        "type" : "type-bbb",
+                        "fields" : [ 2, 3, 4 ],
+                        "snapshot-id" : 77,
+                        "sequence-number" : 4,
+                        "offset" : 21474836470000,
+                        "length" : 79834
+                    }
+                ]
+            }"#,
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap(),
+            FileMetadata {
+                blobs: vec![
+                    BlobMetadata {
+                        r#type: "type-a".to_string(),
+                        fields: vec![1],
+                        snapshot_id: 14,
+                        sequence_number: 3,
+                        offset: 4,
+                        length: 16,
+                        compression_codec: CompressionCodec::None,
+                        properties: HashMap::new(),
+                    },
+                    BlobMetadata {
+                        r#type: "type-bbb".to_string(),
+                        fields: vec![2, 3, 4],
+                        snapshot_id: 77,
+                        sequence_number: 4,
+                        offset: 21474836470000,
+                        length: 79834,
+                        compression_codec: CompressionCodec::None,
+                        properties: HashMap::new(),
+                    },
+                ],
+                properties: HashMap::new(),
+            }
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_properties_in_blob_metadata() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            r#"{
+                "blobs" : [
+                    {
+                        "type" : "type-a",
+                        "fields" : [ 1 ],
+                        "snapshot-id" : 14,
+                        "sequence-number" : 3,
+                        "offset" : 4,
+                        "length" : 16,
+                        "properties" : {
+                            "some key" : "some value"
+                        }
+                    }
+                ]
+            }"#,
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap(),
+            FileMetadata {
+                blobs: vec![BlobMetadata {
+                    r#type: "type-a".to_string(),
+                    fields: vec![1],
+                    snapshot_id: 14,
+                    sequence_number: 3,
+                    offset: 4,
+                    length: 16,
+                    compression_codec: CompressionCodec::None,
+                    properties: {
+                        let mut map = HashMap::new();
+                        map.insert("some key".to_string(), "some 
value".to_string());
+                        map
+                    },
+                }],
+                properties: HashMap::new(),
+            }
+        )
+    }
+
+    #[tokio::test]
+    async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
+
+        let input_file = input_file_with_payload(
+            &temp_dir,
+            &format!(
+                r#"{{
+                    "blobs" : [
+                        {{
+                            "type" : "type-a",
+                            "fields" : [ {} ],
+                            "snapshot-id" : 14,
+                            "sequence-number" : 3,
+                            "offset" : 4,
+                            "length" : 16
+                        }}
+                    ]
+                }}"#,
+                out_of_i32_range_number
+            ),
+        )
+        .await;
+
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+            format!(
+                "DataInvalid => Given string is not valid JSON, source: 
invalid value: integer `{}`, expected i32 at line 5 column 51",
+                out_of_i32_range_number
+            ),
+        )
+    }
+
+    #[tokio::test]
+    async fn 
test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
+        let temp_dir = TempDir::new().unwrap();
+
+        let input_file = input_file_with_payload(&temp_dir, r#""blobs" = 
[]"#).await;
+        assert_eq!(
+            FileMetadata::read(&input_file).await.unwrap_err().to_string(),
+            "DataInvalid => Given string is not valid JSON, source: invalid 
type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
+        )
+    }
+
+    #[tokio::test]
+    async fn test_read_file_metadata_of_uncompressed_empty_file() {
+        let input_file = java_empty_uncompressed_input_file();
+        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
+        assert_eq!(file_metadata, empty_footer_payload())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_metadata_of_uncompressed_metric_data() {
+        let input_file = java_uncompressed_metric_input_file();
+        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
+        assert_eq!(file_metadata, uncompressed_metric_file_metadata())
+    }
+
+    #[tokio::test]
+    async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
+        let input_file = java_zstd_compressed_metric_input_file();
+        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
+        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
+    }
+}
diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs
index c13ebe42..91bdf125 100644
--- a/crates/iceberg/src/puffin/mod.rs
+++ b/crates/iceberg/src/puffin/mod.rs
@@ -22,3 +22,7 @@
 #![allow(dead_code)]
 
 mod compression;
+mod metadata;
+
+#[cfg(test)]
+mod test_utils;
diff --git a/crates/iceberg/src/puffin/test_utils.rs 
b/crates/iceberg/src/puffin/test_utils.rs
new file mode 100644
index 00000000..e49e51d5
--- /dev/null
+++ b/crates/iceberg/src/puffin/test_utils.rs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use crate::io::{FileIOBuilder, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};
+
+const JAVA_TESTDATA: &str = "testdata/puffin/java-generated";
+const EMPTY_UNCOMPRESSED: &str = "empty-puffin-uncompressed.bin";
+const METRIC_UNCOMPRESSED: &str = "sample-metric-data-uncompressed.bin";
+const METRIC_ZSTD_COMPRESSED: &str = "sample-metric-data-compressed-zstd.bin";
+
+fn input_file_for_test_data(path: &str) -> InputFile {
+    FileIOBuilder::new_fs_io()
+        .build()
+        .unwrap()
+        .new_input(env!("CARGO_MANIFEST_DIR").to_owned() + "/" + path)
+        .unwrap()
+}
+
+pub(crate) fn java_empty_uncompressed_input_file() -> InputFile {
+    input_file_for_test_data(&[JAVA_TESTDATA, EMPTY_UNCOMPRESSED].join("/"))
+}
+
+pub(crate) fn java_uncompressed_metric_input_file() -> InputFile {
+    input_file_for_test_data(&[JAVA_TESTDATA, METRIC_UNCOMPRESSED].join("/"))
+}
+
+pub(crate) fn java_zstd_compressed_metric_input_file() -> InputFile {
+    input_file_for_test_data(&[JAVA_TESTDATA, 
METRIC_ZSTD_COMPRESSED].join("/"))
+}
+
+pub(crate) fn empty_footer_payload() -> FileMetadata {
+    FileMetadata {
+        blobs: Vec::new(),
+        properties: HashMap::new(),
+    }
+}
+
+pub(crate) fn empty_footer_payload_bytes() -> Vec<u8> {
+    return serde_json::to_string::<FileMetadata>(&empty_footer_payload())
+        .unwrap()
+        .as_bytes()
+        .to_vec();
+}
+
+pub(crate) fn empty_footer_payload_bytes_length_bytes() -> [u8; 4] {
+    u32::to_le_bytes(empty_footer_payload_bytes().len() as u32)
+}
+
+pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob";
+pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1];
+pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2;
+pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1;
+
+pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
+    BlobMetadata {
+        r#type: METRIC_BLOB_0_TYPE.to_string(),
+        fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
+        snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
+        sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
+        offset: 4,
+        length: 22,
+        compression_codec: CompressionCodec::Zstd,
+        properties: HashMap::new(),
+    }
+}
+
+pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata {
+    BlobMetadata {
+        r#type: METRIC_BLOB_0_TYPE.to_string(),
+        fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
+        snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
+        sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
+        offset: 4,
+        length: 9,
+        compression_codec: CompressionCodec::None,
+        properties: HashMap::new(),
+    }
+}
+
+pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob";
+pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2];
+pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2;
+pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1;
+
+pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata {
+    BlobMetadata {
+        r#type: METRIC_BLOB_1_TYPE.to_string(),
+        fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
+        snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
+        sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
+        offset: 13,
+        length: 83,
+        compression_codec: CompressionCodec::None,
+        properties: HashMap::new(),
+    }
+}
+
+pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
+    BlobMetadata {
+        r#type: METRIC_BLOB_1_TYPE.to_string(),
+        fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
+        snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
+        sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
+        offset: 26,
+        length: 77,
+        compression_codec: CompressionCodec::Zstd,
+        properties: HashMap::new(),
+    }
+}
+
+pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234";
+
+pub(crate) fn file_properties() -> HashMap<String, String> {
+    let mut properties = HashMap::new();
+    properties.insert(
+        CREATED_BY_PROPERTY.to_string(),
+        CREATED_BY_PROPERTY_VALUE.to_string(),
+    );
+    properties
+}
+
+pub(crate) fn uncompressed_metric_file_metadata() -> FileMetadata {
+    FileMetadata {
+        blobs: vec![
+            uncompressed_metric_blob_0_metadata(),
+            uncompressed_metric_blob_1_metadata(),
+        ],
+        properties: file_properties(),
+    }
+}
+
+pub(crate) fn zstd_compressed_metric_file_metadata() -> FileMetadata {
+    FileMetadata {
+        blobs: vec![
+            zstd_compressed_metric_blob_0_metadata(),
+            zstd_compressed_metric_blob_1_metadata(),
+        ],
+        properties: file_properties(),
+    }
+}
diff --git 
a/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin 
b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin
new file mode 100644
index 00000000..142b45bd
Binary files /dev/null and 
b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin 
differ
diff --git 
a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin
 
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin
new file mode 100644
index 00000000..ac8b69c7
Binary files /dev/null and 
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin
 differ
diff --git 
a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin
 
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin
new file mode 100644
index 00000000..ab8da138
Binary files /dev/null and 
b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin
 differ


Reply via email to