This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b6dc4cfb feat: Support for V3 Metadata (#1682)
b6dc4cfb is described below
commit b6dc4cfb8bd68c777a14e73afcbd4cf8a1a093e1
Author: Christian <[email protected]>
AuthorDate: Tue Nov 4 17:35:46 2025 +0100
feat: Support for V3 Metadata (#1682)
## Which issue does this PR close?
Towards V3 Support!
## What changes are included in this PR?
Introduce V3 FormatVersion accross Iceberg Metadata.
## Are these changes tested?
Yes. Java has [a few more
tests](https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java#L36)
regarding other table operations (different deletes, remove file), but
we don't have those yet in our Transaction interface.
---------
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
---
crates/iceberg/src/catalog/memory/catalog.rs | 4 +-
crates/iceberg/src/catalog/mod.rs | 219 ++++++-
.../src/expr/visitors/manifest_evaluator.rs | 1 +
crates/iceberg/src/spec/encrypted_key.rs | 16 +-
crates/iceberg/src/spec/manifest/data_file.rs | 6 +-
crates/iceberg/src/spec/manifest/entry.rs | 38 ++
crates/iceberg/src/spec/manifest/mod.rs | 3 +-
crates/iceberg/src/spec/manifest/writer.rs | 76 ++-
crates/iceberg/src/spec/manifest_list.rs | 671 ++++++++++++++++++++-
crates/iceberg/src/spec/snapshot.rs | 138 ++++-
crates/iceberg/src/spec/snapshot_summary.rs | 2 +
crates/iceberg/src/spec/table_metadata.rs | 541 ++++++++++++++++-
crates/iceberg/src/spec/table_metadata_builder.rs | 453 +++++++++++++-
crates/iceberg/src/transaction/mod.rs | 122 +++-
crates/iceberg/src/transaction/snapshot.rs | 36 +-
.../src/writer/file_writer/location_generator.rs | 2 +
.../TableMetadataV3ValidMinimal.json | 74 +++
17 files changed, 2276 insertions(+), 126 deletions(-)
diff --git a/crates/iceberg/src/catalog/memory/catalog.rs
b/crates/iceberg/src/catalog/memory/catalog.rs
index fdb495f6..cfa3dc6b 100644
--- a/crates/iceberg/src/catalog/memory/catalog.rs
+++ b/crates/iceberg/src/catalog/memory/catalog.rs
@@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
}
#[cfg(test)]
-mod tests {
+pub(crate) mod tests {
use std::collections::HashSet;
use std::hash::Hash;
use std::iter::FromIterator;
@@ -396,7 +396,7 @@ mod tests {
temp_dir.path().to_str().unwrap().to_string()
}
- async fn new_memory_catalog() -> impl Catalog {
+ pub(crate) async fn new_memory_catalog() -> impl Catalog {
let warehouse_location = temp_path();
MemoryCatalogBuilder::default()
.load(
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index ec4b77fe..27d5edae 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -28,7 +28,7 @@ use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;
-use _serde::deserialize_snapshot;
+use _serde::{deserialize_snapshot, serialize_snapshot};
use async_trait::async_trait;
pub use memory::MemoryCatalog;
pub use metadata_location::*;
@@ -39,9 +39,9 @@ use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::spec::{
- FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
SnapshotReference,
- SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
UnboundPartitionSpec,
- ViewFormatVersion, ViewRepresentations, ViewVersion,
+ EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId,
Snapshot,
+ SnapshotReference, SortOrder, StatisticsFile, TableMetadata,
TableMetadataBuilder,
+ UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
@@ -291,6 +291,9 @@ pub struct TableCreation {
props.into_iter().collect()
}))]
pub properties: HashMap<String, String>,
+ /// Format version of the table. Defaults to V2.
+ #[builder(default = FormatVersion::V2)]
+ pub format_version: FormatVersion,
}
/// TableCommit represents the commit of a table in the catalog.
@@ -479,7 +482,10 @@ pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
AddSnapshot {
/// Snapshot to add.
- #[serde(deserialize_with = "deserialize_snapshot")]
+ #[serde(
+ deserialize_with = "deserialize_snapshot",
+ serialize_with = "serialize_snapshot"
+ )]
snapshot: Snapshot,
},
/// Set table's snapshot ref.
@@ -554,6 +560,18 @@ pub enum TableUpdate {
/// Schema IDs to remove.
schema_ids: Vec<i32>,
},
+ /// Add an encryption key
+ #[serde(rename_all = "kebab-case")]
+ AddEncryptionKey {
+ /// The encryption key to add.
+ encryption_key: EncryptedKey,
+ },
+ /// Remove an encryption key
+ #[serde(rename_all = "kebab-case")]
+ RemoveEncryptionKey {
+ /// The id of the encryption key to remove.
+ key_id: String,
+ },
}
impl TableUpdate {
@@ -598,6 +616,12 @@ impl TableUpdate {
Ok(builder.remove_partition_statistics(snapshot_id))
}
TableUpdate::RemoveSchemas { schema_ids } =>
builder.remove_schemas(&schema_ids),
+ TableUpdate::AddEncryptionKey { encryption_key } => {
+ Ok(builder.add_encryption_key(encryption_key))
+ }
+ TableUpdate::RemoveEncryptionKey { key_id } => {
+ Ok(builder.remove_encryption_key(&key_id))
+ }
}
}
}
@@ -742,7 +766,7 @@ impl TableRequirement {
}
pub(super) mod _serde {
- use serde::{Deserialize as _, Deserializer};
+ use serde::{Deserialize as _, Deserializer, Serialize as _};
use super::*;
use crate::spec::{SchemaId, Summary};
@@ -755,7 +779,18 @@ pub(super) mod _serde {
Ok(buf.into())
}
- #[derive(Debug, Deserialize, PartialEq, Eq)]
+ pub(super) fn serialize_snapshot<S>(
+ snapshot: &Snapshot,
+ serializer: S,
+ ) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ let buf: CatalogSnapshot = snapshot.clone().into();
+ buf.serialize(serializer)
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Defines the structure of a v2 snapshot for the catalog.
/// Main difference to SnapshotV2 is that sequence-number is optional
@@ -771,6 +806,12 @@ pub(super) mod _serde {
summary: Summary,
#[serde(skip_serializing_if = "Option::is_none")]
schema_id: Option<SchemaId>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ first_row_id: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ added_rows: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ key_id: Option<String>,
}
impl From<CatalogSnapshot> for Snapshot {
@@ -783,6 +824,9 @@ pub(super) mod _serde {
manifest_list,
schema_id,
summary,
+ first_row_id,
+ added_rows,
+ key_id,
} = snapshot;
let builder = Snapshot::builder()
.with_snapshot_id(snapshot_id)
@@ -790,11 +834,49 @@ pub(super) mod _serde {
.with_sequence_number(sequence_number)
.with_timestamp_ms(timestamp_ms)
.with_manifest_list(manifest_list)
- .with_summary(summary);
- if let Some(schema_id) = schema_id {
- builder.with_schema_id(schema_id).build()
- } else {
- builder.build()
+ .with_summary(summary)
+ .with_encryption_key_id(key_id);
+ let row_range = first_row_id.zip(added_rows);
+ match (schema_id, row_range) {
+ (None, None) => builder.build(),
+ (Some(schema_id), None) =>
builder.with_schema_id(schema_id).build(),
+ (None, Some((first_row_id, last_row_id))) => {
+ builder.with_row_range(first_row_id, last_row_id).build()
+ }
+ (Some(schema_id), Some((first_row_id, last_row_id))) => builder
+ .with_schema_id(schema_id)
+ .with_row_range(first_row_id, last_row_id)
+ .build(),
+ }
+ }
+ }
+
+ impl From<Snapshot> for CatalogSnapshot {
+ fn from(snapshot: Snapshot) -> Self {
+ let first_row_id = snapshot.first_row_id();
+ let added_rows = snapshot.added_rows_count();
+ let Snapshot {
+ snapshot_id,
+ parent_snapshot_id,
+ sequence_number,
+ timestamp_ms,
+ manifest_list,
+ summary,
+ schema_id,
+ row_range: _,
+ encryption_key_id: key_id,
+ } = snapshot;
+ CatalogSnapshot {
+ snapshot_id,
+ parent_snapshot_id,
+ sequence_number,
+ timestamp_ms,
+ manifest_list,
+ summary,
+ schema_id,
+ first_row_id,
+ added_rows,
+ key_id,
}
}
}
@@ -938,6 +1020,7 @@ mod tests {
use std::fs::File;
use std::io::BufReader;
+ use base64::Engine as _;
use serde::Serialize;
use serde::de::DeserializeOwned;
use uuid::uuid;
@@ -945,7 +1028,7 @@ mod tests {
use super::ViewUpdate;
use crate::io::FileIOBuilder;
use crate::spec::{
- BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder,
Operation,
+ BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField,
NullOrder, Operation,
PartitionStatisticsFile, PrimitiveType, Schema, Snapshot,
SnapshotReference,
SnapshotRetention, SortDirection, SortField, SortOrder,
SqlViewRepresentation,
StatisticsFile, Summary, TableMetadata, TableMetadataBuilder,
Transform, Type,
@@ -1075,20 +1158,18 @@ mod tests {
assert!(requirement.check(Some(&metadata)).is_ok());
// Add snapshot
- let record = r#"
- {
- "snapshot-id": 3051729675574597004,
- "sequence-number": 10,
- "timestamp-ms": 9992191116217,
- "summary": {
- "operation": "append"
- },
- "manifest-list": "s3://b/wh/.../s1.avro",
- "schema-id": 0
- }
- "#;
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(3051729675574597004)
+ .with_sequence_number(10)
+ .with_timestamp_ms(9992191116217)
+ .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
+ .with_schema_id(0)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .build();
- let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let builder = metadata.into_builder(None);
let builder = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
@@ -1666,6 +1747,50 @@ mod tests {
assert_eq!(actual, update, "Parsed value is not equal to expected");
}
+ #[test]
+ fn test_add_snapshot_v3() {
+ let json = serde_json::json!(
+ {
+ "action": "add-snapshot",
+ "snapshot": {
+ "snapshot-id": 3055729675574597000i64,
+ "parent-snapshot-id": 3051729675574597000i64,
+ "timestamp-ms": 1555100955770i64,
+ "first-row-id":0,
+ "added-rows":2,
+ "key-id":"key123",
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://a/b/2.avro"
+ }
+ });
+
+ let update = TableUpdate::AddSnapshot {
+ snapshot: Snapshot::builder()
+ .with_snapshot_id(3055729675574597000)
+ .with_parent_snapshot_id(Some(3051729675574597000))
+ .with_timestamp_ms(1555100955770)
+ .with_sequence_number(0)
+ .with_manifest_list("s3://a/b/2.avro")
+ .with_row_range(0, 2)
+ .with_encryption_key_id(Some("key123".to_string()))
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::default(),
+ })
+ .build(),
+ };
+
+ let actual: TableUpdate = serde_json::from_value(json).expect("Failed
to parse from json");
+ assert_eq!(actual, update, "Parsed value is not equal to expected");
+ let restored: TableUpdate = serde_json::from_str(
+ &serde_json::to_string(&actual).expect("Failed to serialize to
json"),
+ )
+ .expect("Failed to parse from serialized json");
+ assert_eq!(restored, update);
+ }
+
#[test]
fn test_remove_snapshots() {
let json = r#"
@@ -2169,6 +2294,48 @@ mod tests {
);
}
+ #[test]
+ fn test_add_encryption_key() {
+ let key_bytes = "key".as_bytes();
+ let encoded_key =
base64::engine::general_purpose::STANDARD.encode(key_bytes);
+ test_serde_json(
+ format!(
+ r#"
+ {{
+ "action": "add-encryption-key",
+ "encryption-key": {{
+ "key-id": "a",
+ "encrypted-key-metadata": "{encoded_key}",
+ "encrypted-by-id": "b"
+ }}
+ }}
+ "#
+ ),
+ TableUpdate::AddEncryptionKey {
+ encryption_key: EncryptedKey::builder()
+ .key_id("a")
+ .encrypted_key_metadata(key_bytes.to_vec())
+ .encrypted_by_id("b")
+ .build(),
+ },
+ );
+ }
+
+ #[test]
+ fn test_remove_encryption_key() {
+ test_serde_json(
+ r#"
+ {
+ "action": "remove-encryption-key",
+ "key-id": "a"
+ }
+ "#,
+ TableUpdate::RemoveEncryptionKey {
+ key_id: "a".to_string(),
+ },
+ );
+ }
+
#[test]
fn test_table_commit() {
let table = {
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index 903c9ea4..abbd136c 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -703,6 +703,7 @@ mod test {
deleted_rows_count: None,
partitions: Some(partitions),
key_metadata: None,
+ first_row_id: None,
}
}
diff --git a/crates/iceberg/src/spec/encrypted_key.rs
b/crates/iceberg/src/spec/encrypted_key.rs
index db19a023..6908ade1 100644
--- a/crates/iceberg/src/spec/encrypted_key.rs
+++ b/crates/iceberg/src/spec/encrypted_key.rs
@@ -26,16 +26,16 @@ use serde::{Deserialize, Serialize};
pub struct EncryptedKey {
/// Unique identifier for the key
#[builder(setter(into))]
- key_id: String,
+ pub(crate) key_id: String,
/// Encrypted key metadata as binary data
#[builder(setter(into))]
- encrypted_key_metadata: Vec<u8>,
+ pub(crate) encrypted_key_metadata: Vec<u8>,
/// Identifier of the entity that encrypted this key
- #[builder(setter(into))]
- encrypted_by_id: String,
+ #[builder(default, setter(into, strip_option))]
+ pub(crate) encrypted_by_id: Option<String>,
/// Additional properties associated with the key
#[builder(default)]
- properties: HashMap<String, String>,
+ pub(crate) properties: HashMap<String, String>,
}
impl EncryptedKey {
@@ -50,8 +50,8 @@ impl EncryptedKey {
}
/// Returns the ID of the entity that encrypted this key
- pub fn encrypted_by_id(&self) -> &str {
- &self.encrypted_by_id
+ pub fn encrypted_by_id(&self) -> Option<&str> {
+ self.encrypted_by_id.as_deref()
}
/// Returns the properties map
@@ -72,7 +72,7 @@ pub(super) mod _serde {
pub(super) struct EncryptedKeySerde {
pub key_id: String,
pub encrypted_key_metadata: String, // Base64 encoded
- pub encrypted_by_id: String,
+ pub encrypted_by_id: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, String>,
}
diff --git a/crates/iceberg/src/spec/manifest/data_file.rs
b/crates/iceberg/src/spec/manifest/data_file.rs
index 6c63f622..a9c041f5 100644
--- a/crates/iceberg/src/spec/manifest/data_file.rs
+++ b/crates/iceberg/src/spec/manifest/data_file.rs
@@ -24,7 +24,9 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay};
use super::_serde::DataFileSerde;
-use super::{Datum, FormatVersion, Schema, data_file_schema_v1,
data_file_schema_v2};
+use super::{
+ Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2,
data_file_schema_v3,
+};
use crate::error::Result;
use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
use crate::{Error, ErrorKind};
@@ -295,6 +297,7 @@ pub fn write_data_files_to_avro<W: Write>(
let avro_schema = match version {
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
+ FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
};
let mut writer = AvroWriter::new(&avro_schema, writer);
@@ -322,6 +325,7 @@ pub fn read_data_files_from_avro<R: Read>(
let avro_schema = match version {
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
+ FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
};
let reader = AvroReader::with_schema(&avro_schema, reader)?;
diff --git a/crates/iceberg/src/spec/manifest/entry.rs
b/crates/iceberg/src/spec/manifest/entry.rs
index d11d8acf..e8fe0f22 100644
--- a/crates/iceberg/src/spec/manifest/entry.rs
+++ b/crates/iceberg/src/spec/manifest/entry.rs
@@ -509,6 +509,42 @@ static CONTENT_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
})
};
+fn data_file_fields_v3(partition_type: &StructType) -> Vec<NestedFieldRef> {
+ vec![
+ CONTENT.clone(),
+ FILE_PATH.clone(),
+ FILE_FORMAT.clone(),
+ Arc::new(NestedField::required(
+ 102,
+ "partition",
+ Type::Struct(partition_type.clone()),
+ )),
+ RECORD_COUNT.clone(),
+ FILE_SIZE_IN_BYTES.clone(),
+ COLUMN_SIZES.clone(),
+ VALUE_COUNTS.clone(),
+ NULL_VALUE_COUNTS.clone(),
+ NAN_VALUE_COUNTS.clone(),
+ LOWER_BOUNDS.clone(),
+ UPPER_BOUNDS.clone(),
+ KEY_METADATA.clone(),
+ SPLIT_OFFSETS.clone(),
+ EQUALITY_IDS.clone(),
+ SORT_ORDER_ID.clone(),
+ FIRST_ROW_ID.clone(),
+ REFERENCE_DATA_FILE.clone(),
+ CONTENT_OFFSET.clone(),
+ CONTENT_SIZE_IN_BYTES.clone(),
+ ]
+}
+
+pub(super) fn data_file_schema_v3(partition_type: &StructType) ->
Result<AvroSchema> {
+ let schema = Schema::builder()
+ .with_fields(data_file_fields_v3(partition_type))
+ .build()?;
+ schema_to_avro_schema("data_file", &schema)
+}
+
fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
@@ -533,6 +569,8 @@ fn data_file_fields_v2(partition_type: &StructType) ->
Vec<NestedFieldRef> {
SORT_ORDER_ID.clone(),
FIRST_ROW_ID.clone(),
REFERENCE_DATA_FILE.clone(),
+ // Why are the following two fields here in the existing v2 schema?
+ // In the spec, they are not even listed as optional for v2.
CONTENT_OFFSET.clone(),
CONTENT_SIZE_IN_BYTES.clone(),
]
diff --git a/crates/iceberg/src/spec/manifest/mod.rs
b/crates/iceberg/src/spec/manifest/mod.rs
index a1a5612c..51219bfd 100644
--- a/crates/iceberg/src/spec/manifest/mod.rs
+++ b/crates/iceberg/src/spec/manifest/mod.rs
@@ -70,7 +70,8 @@ impl Manifest {
})
.collect::<Result<Vec<_>>>()?
}
- FormatVersion::V2 => {
+ // Manifest Schema & Manifest Entry did not change between V2 and
V3
+ FormatVersion::V2 | FormatVersion::V3 => {
let schema = manifest_schema_v2(&partition_type)?;
let reader = AvroReader::with_schema(&schema, bs)?;
reader
diff --git a/crates/iceberg/src/spec/manifest/writer.rs
b/crates/iceberg/src/spec/manifest/writer.rs
index 673f8b5d..ebb0590b 100644
--- a/crates/iceberg/src/spec/manifest/writer.rs
+++ b/crates/iceberg/src/spec/manifest/writer.rs
@@ -72,7 +72,13 @@ impl ManifestWriterBuilder {
.format_version(FormatVersion::V1)
.content(ManifestContentType::Data)
.build();
- ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata,
metadata)
+ ManifestWriter::new(
+ self.output,
+ self.snapshot_id,
+ self.key_metadata,
+ metadata,
+ None,
+ )
}
/// Build a [`ManifestWriter`] for format version 2, data content.
@@ -84,7 +90,13 @@ impl ManifestWriterBuilder {
.format_version(FormatVersion::V2)
.content(ManifestContentType::Data)
.build();
- ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata,
metadata)
+ ManifestWriter::new(
+ self.output,
+ self.snapshot_id,
+ self.key_metadata,
+ metadata,
+ None,
+ )
}
/// Build a [`ManifestWriter`] for format version 2, deletes content.
@@ -96,7 +108,51 @@ impl ManifestWriterBuilder {
.format_version(FormatVersion::V2)
.content(ManifestContentType::Deletes)
.build();
- ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata,
metadata)
+ ManifestWriter::new(
+ self.output,
+ self.snapshot_id,
+ self.key_metadata,
+ metadata,
+ None,
+ )
+ }
+
+ /// Build a [`ManifestWriter`] for format version 2, data content.
+ pub fn build_v3_data(self) -> ManifestWriter {
+ let metadata = ManifestMetadata::builder()
+ .schema_id(self.schema.schema_id())
+ .schema(self.schema)
+ .partition_spec(self.partition_spec)
+ .format_version(FormatVersion::V3)
+ .content(ManifestContentType::Data)
+ .build();
+ ManifestWriter::new(
+ self.output,
+ self.snapshot_id,
+ self.key_metadata,
+ metadata,
+ // First row id is assigned by the [`ManifestListWriter`] when the
manifest
+ // is added to the list.
+ None,
+ )
+ }
+
+ /// Build a [`ManifestWriter`] for format version 3, deletes content.
+ pub fn build_v3_deletes(self) -> ManifestWriter {
+ let metadata = ManifestMetadata::builder()
+ .schema_id(self.schema.schema_id())
+ .schema(self.schema)
+ .partition_spec(self.partition_spec)
+ .format_version(FormatVersion::V3)
+ .content(ManifestContentType::Deletes)
+ .build();
+ ManifestWriter::new(
+ self.output,
+ self.snapshot_id,
+ self.key_metadata,
+ metadata,
+ None,
+ )
}
}
@@ -112,6 +168,7 @@ pub struct ManifestWriter {
existing_rows: u64,
deleted_files: u32,
deleted_rows: u64,
+ first_row_id: Option<u64>,
min_seq_num: Option<i64>,
@@ -129,6 +186,7 @@ impl ManifestWriter {
snapshot_id: Option<i64>,
key_metadata: Option<Vec<u8>>,
metadata: ManifestMetadata,
+ first_row_id: Option<u64>,
) -> Self {
Self {
output,
@@ -139,6 +197,7 @@ impl ManifestWriter {
existing_rows: 0,
deleted_files: 0,
deleted_rows: 0,
+ first_row_id,
min_seq_num: None,
key_metadata,
manifest_entries: Vec::new(),
@@ -348,7 +407,8 @@ impl ManifestWriter {
let table_schema = &self.metadata.schema;
let avro_schema = match self.metadata.format_version {
FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
- FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
+ // Manifest schema did not change between V2 and V3
+ FormatVersion::V2 | FormatVersion::V3 =>
manifest_schema_v2(&partition_type)?,
};
let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
avro_writer.add_user_metadata(
@@ -388,8 +448,11 @@ impl ManifestWriter {
let value = match self.metadata.format_version {
FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry,
&partition_type)?)?
.resolve(&avro_schema)?,
- FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry,
&partition_type)?)?
- .resolve(&avro_schema)?,
+ // Manifest entry format did not change between V2 and V3
+ FormatVersion::V2 | FormatVersion::V3 => {
+ to_value(ManifestEntryV2::try_from(entry,
&partition_type)?)?
+ .resolve(&avro_schema)?
+ }
};
avro_writer.append(value)?;
@@ -417,6 +480,7 @@ impl ManifestWriter {
deleted_rows_count: Some(self.deleted_rows),
partitions: Some(partition_summary),
key_metadata: self.key_metadata,
+ first_row_id: self.first_row_id,
})
}
}
diff --git a/crates/iceberg/src/spec/manifest_list.rs
b/crates/iceberg/src/spec/manifest_list.rs
index 16409ffe..5e97e546 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -31,6 +31,8 @@ use self::_serde::{ManifestFileV1, ManifestFileV2};
use super::{FormatVersion, Manifest};
use crate::error::Result;
use crate::io::{FileIO, OutputFile};
+use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
+use crate::spec::manifest_list::_serde::ManifestFileV3;
use crate::{Error, ErrorKind};
/// Placeholder for sequence number. The field with this value must be
replaced with the actual sequence number before it write.
@@ -69,6 +71,11 @@ impl ManifestList {
let values =
Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV2>(&values)?.try_into()
}
+ FormatVersion::V3 => {
+ let reader = Reader::new(bs)?;
+ let values =
Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
+ from_value::<_serde::ManifestListV3>(&values)?.try_into()
+ }
}
}
@@ -90,6 +97,7 @@ pub struct ManifestListWriter {
avro_writer: Writer<'static, Vec<u8>>,
sequence_number: i64,
snapshot_id: i64,
+ next_row_id: Option<u64>,
}
impl std::fmt::Debug for ManifestListWriter {
@@ -103,6 +111,11 @@ impl std::fmt::Debug for ManifestListWriter {
}
impl ManifestListWriter {
+ /// Get the next row ID that will be assigned to the next data manifest
added.
+ pub fn next_row_id(&self) -> Option<u64> {
+ self.next_row_id
+ }
+
/// Construct a v1 [`ManifestListWriter`] that writes to a provided
[`OutputFile`].
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id:
Option<i64>) -> Self {
let mut metadata = HashMap::from_iter([
@@ -115,7 +128,14 @@ impl ManifestListWriter {
parent_snapshot_id.to_string(),
);
}
- Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
+ Self::new(
+ FormatVersion::V1,
+ output_file,
+ metadata,
+ 0,
+ snapshot_id,
+ None,
+ )
}
/// Construct a v2 [`ManifestListWriter`] that writes to a provided
[`OutputFile`].
@@ -142,6 +162,42 @@ impl ManifestListWriter {
metadata,
sequence_number,
snapshot_id,
+ None,
+ )
+ }
+
+ /// Construct a v3 [`ManifestListWriter`] that writes to a provided
[`OutputFile`].
+ pub fn v3(
+ output_file: OutputFile,
+ snapshot_id: i64,
+ parent_snapshot_id: Option<i64>,
+ sequence_number: i64,
+ first_row_id: Option<u64>, // Always None for delete manifests
+ ) -> Self {
+ let mut metadata = HashMap::from_iter([
+ ("snapshot-id".to_string(), snapshot_id.to_string()),
+ ("sequence-number".to_string(), sequence_number.to_string()),
+ ("format-version".to_string(), "3".to_string()),
+ ]);
+ metadata.insert(
+ "parent-snapshot-id".to_string(),
+ parent_snapshot_id
+ .map(|v| v.to_string())
+ .unwrap_or("null".to_string()),
+ );
+ metadata.insert(
+ "first-row-id".to_string(),
+ first_row_id
+ .map(|v| v.to_string())
+ .unwrap_or("null".to_string()),
+ );
+ Self::new(
+ FormatVersion::V3,
+ output_file,
+ metadata,
+ sequence_number,
+ snapshot_id,
+ first_row_id,
)
}
@@ -151,10 +207,12 @@ impl ManifestListWriter {
metadata: HashMap<String, String>,
sequence_number: i64,
snapshot_id: i64,
+ first_row_id: Option<u64>,
) -> Self {
let avro_schema = match format_version {
FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
+ FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
};
let mut avro_writer = Writer::new(avro_schema, Vec::new());
for (key, value) in metadata {
@@ -168,46 +226,35 @@ impl ManifestListWriter {
avro_writer,
sequence_number,
snapshot_id,
+ next_row_id: first_row_id,
}
}
/// Append manifests to be written.
+ ///
+ /// If V3 Manifests are added and the `first_row_id` of any data manifest
is unassigned,
+ /// it will be assigned based on the `next_row_id` of the writer, and the
`next_row_id` of the writer will be updated accordingly.
+ /// If `first_row_id` is already assigned, it will be validated against
the `next_row_id` of the writer.
pub fn add_manifests(&mut self, manifests: impl Iterator<Item =
ManifestFile>) -> Result<()> {
match self.format_version {
FormatVersion::V1 => {
for manifest in manifests {
- let manifes: ManifestFileV1 = manifest.try_into()?;
- self.avro_writer.append_ser(manifes)?;
+ let manifests: ManifestFileV1 = manifest.try_into()?;
+ self.avro_writer.append_ser(manifests)?;
}
}
- FormatVersion::V2 => {
+ FormatVersion::V2 | FormatVersion::V3 => {
for mut manifest in manifests {
- if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
- if manifest.added_snapshot_id != self.snapshot_id {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Found unassigned sequence number for a
manifest from snapshot {}.",
- manifest.added_snapshot_id
- ),
- ));
- }
- manifest.sequence_number = self.sequence_number;
+ self.assign_sequence_numbers(&mut manifest)?;
+
+ if self.format_version == FormatVersion::V2 {
+ let manifest_entry: ManifestFileV2 =
manifest.try_into()?;
+ self.avro_writer.append_ser(manifest_entry)?;
+ } else if self.format_version == FormatVersion::V3 {
+ self.assign_first_row_id(&mut manifest)?;
+ let manifest_entry: ManifestFileV3 =
manifest.try_into()?;
+ self.avro_writer.append_ser(manifest_entry)?;
}
- if manifest.min_sequence_number ==
UNASSIGNED_SEQUENCE_NUMBER {
- if manifest.added_snapshot_id != self.snapshot_id {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Found unassigned sequence number for a
manifest from snapshot {}.",
- manifest.added_snapshot_id
- ),
- ));
- }
- manifest.min_sequence_number = self.sequence_number;
- }
- let manifest_entry: ManifestFileV2 = manifest.try_into()?;
- self.avro_writer.append_ser(manifest_entry)?;
}
}
}
@@ -222,6 +269,112 @@ impl ManifestListWriter {
writer.close().await?;
Ok(())
}
+
+ /// Assign sequence numbers to manifest if they are unassigned
+ fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) ->
Result<()> {
+ if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
+ if manifest.added_snapshot_id != self.snapshot_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Found unassigned sequence number for a manifest from
snapshot {}.",
+ manifest.added_snapshot_id
+ ),
+ ));
+ }
+ manifest.sequence_number = self.sequence_number;
+ }
+
+ if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
+ if manifest.added_snapshot_id != self.snapshot_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Found unassigned sequence number for a manifest from
snapshot {}.",
+ manifest.added_snapshot_id
+ ),
+ ));
+ }
+ manifest.min_sequence_number = self.sequence_number;
+ }
+
+ Ok(())
+ }
+
+ /// Returns number of newly assigned first-row-ids, if any.
+ fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) ->
Result<()> {
+ match manifest.content {
+ ManifestContentType::Data => {
+ match (self.next_row_id, manifest.first_row_id) {
+ (Some(_), Some(_)) => {
+ // Case: Manifest with already assigned first row ID.
+ // No need to increase next_row_id, as this manifest
is already assigned.
+ }
+ (None, Some(manifest_first_row_id)) => {
+ // Case: Assigned first row ID for data manifest, but
the writer does not have a next-row-id assigned.
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Found invalid first-row-id assignment for
Manifest {}. Writer does not have a next-row-id assigned, but the manifest has
first-row-id assigned to {}.",
+ manifest.manifest_path, manifest_first_row_id,
+ ),
+ ));
+ }
+ (Some(writer_next_row_id), None) => {
+ // Case: Unassigned first row ID for data manifest.
This is either a new
+ // manifest, or a manifest from a pre-v3 snapshot. We
need to assign one.
+ let (existing_rows_count, added_rows_count) =
+ require_row_counts_in_manifest(manifest)?;
+ manifest.first_row_id = Some(writer_next_row_id);
+
+ self.next_row_id = writer_next_row_id
+ .checked_add(existing_rows_count)
+ .and_then(|sum| sum.checked_add(added_rows_count))
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Row ID overflow when computing next row
ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count:
{existing_rows_count}, Added Rows Count: {added_rows_count}",
+ manifest.manifest_path
+ ),
+ )
+ }).map(Some)?;
+ }
+ (None, None) => {
+ // Case: Table without row lineage. No action needed.
+ }
+ }
+ }
+ ManifestContentType::Deletes => {
+ // Deletes never have a first-row-id assigned.
+ manifest.first_row_id = None;
+ }
+ };
+
+ Ok(())
+ }
+}
+
+fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64,
u64)> {
+ let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot include a Manifest without existing-rows-count to a
table with row lineage enabled. Manifest path: {}",
+ manifest.manifest_path,
+ ),
+ )
+ })?;
+ let added_rows_count = manifest.added_rows_count.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot include a Manifest without added-rows-count to a table
with row lineage enabled. Manifest path: {}",
+ manifest.manifest_path,
+ ),
+ )
+ })?;
+ Ok((existing_rows_count, added_rows_count))
}
/// This is a helper module that defines the schema field of the manifest list
entry.
@@ -453,6 +606,15 @@ mod _const_schema {
))
})
};
+ static FIRST_ROW_ID: Lazy<NestedFieldRef> = {
+ Lazy::new(|| {
+ Arc::new(NestedField::optional(
+ 520,
+ "first_row_id",
+ Type::Primitive(PrimitiveType::Long),
+ ))
+ })
+ };
static V1_SCHEMA: Lazy<Schema> = {
Lazy::new(|| {
@@ -497,11 +659,38 @@ mod _const_schema {
})
};
+ static V3_SCHEMA: Lazy<Schema> = {
+ Lazy::new(|| {
+ let fields = vec![
+ MANIFEST_PATH.clone(),
+ MANIFEST_LENGTH.clone(),
+ PARTITION_SPEC_ID.clone(),
+ CONTENT.clone(),
+ SEQUENCE_NUMBER.clone(),
+ MIN_SEQUENCE_NUMBER.clone(),
+ ADDED_SNAPSHOT_ID.clone(),
+ ADDED_FILES_COUNT_V2.clone(),
+ EXISTING_FILES_COUNT_V2.clone(),
+ DELETED_FILES_COUNT_V2.clone(),
+ ADDED_ROWS_COUNT_V2.clone(),
+ EXISTING_ROWS_COUNT_V2.clone(),
+ DELETED_ROWS_COUNT_V2.clone(),
+ PARTITIONS.clone(),
+ KEY_METADATA.clone(),
+ FIRST_ROW_ID.clone(),
+ ];
+ Schema::builder().with_fields(fields).build().unwrap()
+ })
+ };
+
pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy<AvroSchema> =
Lazy::new(|| schema_to_avro_schema("manifest_file",
&V1_SCHEMA).unwrap());
pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy<AvroSchema> =
Lazy::new(|| schema_to_avro_schema("manifest_file",
&V2_SCHEMA).unwrap());
+
+ pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V3: Lazy<AvroSchema> =
+ Lazy::new(|| schema_to_avro_schema("manifest_file",
&V3_SCHEMA).unwrap());
}
/// Entry in a manifest list.
@@ -580,6 +769,10 @@ pub struct ManifestFile {
///
/// Implementation-specific key metadata for encryption
pub key_metadata: Option<Vec<u8>>,
+ /// field 520
+ ///
+ /// The starting _row_id to assign to rows added by ADDED data files
+ pub first_row_id: Option<u64>,
}
impl ManifestFile {
@@ -703,6 +896,12 @@ pub(super) mod _serde {
use crate::error::Result;
use crate::spec::FieldSummary;
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(transparent)]
+ pub(crate) struct ManifestListV3 {
+ entries: Vec<ManifestFileV3>,
+ }
+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
pub(crate) struct ManifestListV2 {
@@ -715,6 +914,33 @@ pub(super) mod _serde {
entries: Vec<ManifestFileV1>,
}
+ impl ManifestListV3 {
+ /// Converts the [ManifestListV3] into a [ManifestList].
+ pub fn try_into(self) -> Result<super::ManifestList> {
+ Ok(super::ManifestList {
+ entries: self
+ .entries
+ .into_iter()
+ .map(|v| v.try_into())
+ .collect::<Result<Vec<_>>>()?,
+ })
+ }
+ }
+
+ impl TryFrom<super::ManifestList> for ManifestListV3 {
+ type Error = Error;
+
+ fn try_from(value: super::ManifestList) -> std::result::Result<Self,
Self::Error> {
+ Ok(Self {
+ entries: value
+ .entries
+ .into_iter()
+ .map(|v| v.try_into())
+ .collect::<std::result::Result<Vec<_>, _>>()?,
+ })
+ }
+ }
+
impl ManifestListV2 {
/// Converts the [ManifestListV2] into a [ManifestList].
pub fn try_into(self) -> Result<super::ManifestList> {
@@ -813,6 +1039,58 @@ pub(super) mod _serde {
pub key_metadata: Option<ByteBuf>,
}
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ pub(super) struct ManifestFileV3 {
+ pub manifest_path: String,
+ pub manifest_length: i64,
+ pub partition_spec_id: i32,
+ #[serde(default = "v2_default_content_for_v1")]
+ pub content: i32,
+ #[serde(default = "v2_default_sequence_number_for_v1")]
+ pub sequence_number: i64,
+ #[serde(default = "v2_default_min_sequence_number_for_v1")]
+ pub min_sequence_number: i64,
+ pub added_snapshot_id: i64,
+ #[serde(alias = "added_data_files_count", alias = "added_files_count")]
+ pub added_files_count: i32,
+ #[serde(alias = "existing_data_files_count", alias =
"existing_files_count")]
+ pub existing_files_count: i32,
+ #[serde(alias = "deleted_data_files_count", alias =
"deleted_files_count")]
+ pub deleted_files_count: i32,
+ pub added_rows_count: i64,
+ pub existing_rows_count: i64,
+ pub deleted_rows_count: i64,
+ pub partitions: Option<Vec<FieldSummary>>,
+ pub key_metadata: Option<ByteBuf>,
+ pub first_row_id: Option<u64>,
+ }
+
+ impl ManifestFileV3 {
+ /// Converts the [ManifestFileV3] into a [ManifestFile].
+ pub fn try_into(self) -> Result<ManifestFile> {
+ let manifest_file = ManifestFile {
+ manifest_path: self.manifest_path,
+ manifest_length: self.manifest_length,
+ partition_spec_id: self.partition_spec_id,
+ content: self.content.try_into()?,
+ sequence_number: self.sequence_number,
+ min_sequence_number: self.min_sequence_number,
+ added_snapshot_id: self.added_snapshot_id,
+ added_files_count: Some(self.added_files_count.try_into()?),
+ existing_files_count:
Some(self.existing_files_count.try_into()?),
+ deleted_files_count:
Some(self.deleted_files_count.try_into()?),
+ added_rows_count: Some(self.added_rows_count.try_into()?),
+ existing_rows_count:
Some(self.existing_rows_count.try_into()?),
+ deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
+ partitions: self.partitions,
+ key_metadata: self.key_metadata.map(|b| b.into_vec()),
+ first_row_id: self.first_row_id,
+ };
+
+ Ok(manifest_file)
+ }
+ }
+
impl ManifestFileV2 {
/// Converts the [ManifestFileV2] into a [ManifestFile].
pub fn try_into(self) -> Result<ManifestFile> {
@@ -832,6 +1110,7 @@ pub(super) mod _serde {
deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
partitions: self.partitions,
key_metadata: self.key_metadata.map(|b| b.into_vec()),
+ first_row_id: None,
})
}
}
@@ -881,6 +1160,7 @@ pub(super) mod _serde {
content: super::ManifestContentType::Data,
sequence_number: 0,
min_sequence_number: 0,
+ first_row_id: None,
})
}
}
@@ -892,6 +1172,80 @@ pub(super) mod _serde {
}
}
+ impl TryFrom<ManifestFile> for ManifestFileV3 {
+ type Error = Error;
+
+ fn try_from(value: ManifestFile) -> std::result::Result<Self,
Self::Error> {
+ let key_metadata =
convert_to_serde_key_metadata(value.key_metadata);
+ Ok(Self {
+ manifest_path: value.manifest_path,
+ manifest_length: value.manifest_length,
+ partition_spec_id: value.partition_spec_id,
+ content: value.content as i32,
+ sequence_number: value.sequence_number,
+ min_sequence_number: value.min_sequence_number,
+ added_snapshot_id: value.added_snapshot_id,
+ added_files_count: value
+ .added_files_count
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "added_data_files_count in ManifestFileV3 is
required",
+ )
+ })?
+ .try_into()?,
+ existing_files_count: value
+ .existing_files_count
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "existing_data_files_count in ManifestFileV3 is
required",
+ )
+ })?
+ .try_into()?,
+ deleted_files_count: value
+ .deleted_files_count
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "deleted_data_files_count in ManifestFileV3 is
required",
+ )
+ })?
+ .try_into()?,
+ added_rows_count: value
+ .added_rows_count
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "added_rows_count in ManifestFileV3 is required",
+ )
+ })?
+ .try_into()?,
+ existing_rows_count: value
+ .existing_rows_count
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "existing_rows_count in ManifestFileV3 is
required",
+ )
+ })?
+ .try_into()?,
+ deleted_rows_count: value
+ .deleted_rows_count
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "deleted_rows_count in ManifestFileV3 is required",
+ )
+ })?
+ .try_into()?,
+ partitions: value.partitions,
+ key_metadata,
+ first_row_id: value.first_row_id,
+ })
+ }
+ }
+
impl TryFrom<ManifestFile> for ManifestFileV2 {
type Error = Error;
@@ -1012,7 +1366,7 @@ mod test {
use super::_serde::ManifestListV2;
use crate::io::FileIOBuilder;
- use crate::spec::manifest_list::_serde::ManifestListV1;
+ use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3};
use crate::spec::{
Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList,
ManifestListWriter,
UNASSIGNED_SEQUENCE_NUMBER,
@@ -1038,6 +1392,7 @@ mod test {
deleted_rows_count: Some(0),
partitions: Some(vec![]),
key_metadata: None,
+ first_row_id: None,
}
]
};
@@ -1089,6 +1444,7 @@ mod test {
vec![FieldSummary { contains_null: false,
contains_nan: Some(false), lower_bound:
Some(Datum::long(1).to_bytes().unwrap()), upper_bound:
Some(Datum::long(1).to_bytes().unwrap())}]
),
key_metadata: None,
+ first_row_id: None,
},
ManifestFile {
manifest_path:
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
@@ -1108,6 +1464,7 @@ mod test {
vec![FieldSummary { contains_null: false,
contains_nan: Some(false), lower_bound:
Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound:
Some(Datum::float(2.1).to_bytes().unwrap())}]
),
key_metadata: None,
+ first_row_id: None,
}
]
};
@@ -1138,6 +1495,80 @@ mod test {
assert_eq!(manifest_list, parsed_manifest_list);
}
+ #[tokio::test]
+ async fn test_parse_manifest_list_v3() {
+ let manifest_list = ManifestList {
+ entries: vec![
+ ManifestFile {
+ manifest_path:
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+ manifest_length: 6926,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: 1,
+ min_sequence_number: 1,
+ added_snapshot_id: 377075049360453639,
+ added_files_count: Some(1),
+ existing_files_count: Some(0),
+ deleted_files_count: Some(0),
+ added_rows_count: Some(3),
+ existing_rows_count: Some(0),
+ deleted_rows_count: Some(0),
+ partitions: Some(
+ vec![FieldSummary { contains_null: false,
contains_nan: Some(false), lower_bound:
Some(Datum::long(1).to_bytes().unwrap()), upper_bound:
Some(Datum::long(1).to_bytes().unwrap())}]
+ ),
+ key_metadata: None,
+ first_row_id: Some(10),
+ },
+ ManifestFile {
+ manifest_path:
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
+ manifest_length: 6926,
+ partition_spec_id: 2,
+ content: ManifestContentType::Data,
+ sequence_number: 1,
+ min_sequence_number: 1,
+ added_snapshot_id: 377075049360453639,
+ added_files_count: Some(1),
+ existing_files_count: Some(0),
+ deleted_files_count: Some(0),
+ added_rows_count: Some(3),
+ existing_rows_count: Some(0),
+ deleted_rows_count: Some(0),
+ partitions: Some(
+ vec![FieldSummary { contains_null: false,
contains_nan: Some(false), lower_bound:
Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound:
Some(Datum::float(2.1).to_bytes().unwrap())}]
+ ),
+ key_metadata: None,
+ first_row_id: Some(13),
+ }
+ ]
+ };
+
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+
+ let tmp_dir = TempDir::new().unwrap();
+ let file_name = "simple_manifest_list_v3.avro";
+ let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(),
file_name);
+
+ let mut writer = ManifestListWriter::v3(
+ file_io.new_output(full_path.clone()).unwrap(),
+ 377075049360453639,
+ Some(377075049360453639),
+ 1,
+ Some(10),
+ );
+
+ writer
+ .add_manifests(manifest_list.entries.clone().into_iter())
+ .unwrap();
+ writer.close().await.unwrap();
+
+ let bs = fs::read(full_path).expect("read_file must succeed");
+
+ let parsed_manifest_list =
+ ManifestList::parse_with_version(&bs,
crate::spec::FormatVersion::V3).unwrap();
+
+ assert_eq!(manifest_list, parsed_manifest_list);
+ }
+
#[test]
fn test_serialize_manifest_list_v1() {
let manifest_list:ManifestListV1 = ManifestList {
@@ -1157,6 +1588,7 @@ mod test {
deleted_rows_count: Some(0),
partitions: None,
key_metadata: None,
+ first_row_id: None,
}]
}.try_into().unwrap();
let result = serde_json::to_string(&manifest_list).unwrap();
@@ -1187,6 +1619,7 @@ mod test {
vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
),
key_metadata: None,
+ first_row_id: None,
}]
}.try_into().unwrap();
let result = serde_json::to_string(&manifest_list).unwrap();
@@ -1196,6 +1629,37 @@ mod test {
);
}
+ #[test]
+ fn test_serialize_manifest_list_v3() {
+ let manifest_list: ManifestListV3 = ManifestList {
+ entries: vec![ManifestFile {
+ manifest_path:
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+ manifest_length: 6926,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: 1,
+ min_sequence_number: 1,
+ added_snapshot_id: 377075049360453639,
+ added_files_count: Some(1),
+ existing_files_count: Some(0),
+ deleted_files_count: Some(0),
+ added_rows_count: Some(3),
+ existing_rows_count: Some(0),
+ deleted_rows_count: Some(0),
+ partitions: Some(
+ vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+ ),
+ key_metadata: None,
+ first_row_id: Some(10),
+ }]
+ }.try_into().unwrap();
+ let result = serde_json::to_string(&manifest_list).unwrap();
+ assert_eq!(
+ result,
+
r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"uppe
[...]
+ );
+ }
+
#[tokio::test]
async fn test_manifest_list_writer_v1() {
let expected_manifest_list = ManifestList {
@@ -1217,6 +1681,7 @@ mod test {
vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
),
key_metadata: None,
+ first_row_id: None,
}]
};
@@ -1263,6 +1728,7 @@ mod test {
vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
),
key_metadata: None,
+ first_row_id: None,
}]
};
@@ -1287,6 +1753,56 @@ mod test {
temp_dir.close().unwrap();
}
+ #[tokio::test]
+ async fn test_manifest_list_writer_v3() {
+ let snapshot_id = 377075049360453639;
+ let seq_num = 1;
+ let mut expected_manifest_list = ManifestList {
+ entries: vec![ManifestFile {
+ manifest_path:
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+ manifest_length: 6926,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+ min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+ added_snapshot_id: snapshot_id,
+ added_files_count: Some(1),
+ existing_files_count: Some(0),
+ deleted_files_count: Some(0),
+ added_rows_count: Some(3),
+ existing_rows_count: Some(0),
+ deleted_rows_count: Some(0),
+ partitions: Some(
+ vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+ ),
+ key_metadata: None,
+ first_row_id: Some(10),
+ }]
+ };
+
+ let temp_dir = TempDir::new().unwrap();
+ let path = temp_dir.path().join("manifest_list_v2.avro");
+ let io = FileIOBuilder::new_fs_io().build().unwrap();
+ let output_file = io.new_output(path.to_str().unwrap()).unwrap();
+
+ let mut writer =
+ ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num,
Some(10));
+ writer
+ .add_manifests(expected_manifest_list.entries.clone().into_iter())
+ .unwrap();
+ writer.close().await.unwrap();
+
+ let bs = fs::read(path).unwrap();
+ let manifest_list =
+ ManifestList::parse_with_version(&bs,
crate::spec::FormatVersion::V3).unwrap();
+ expected_manifest_list.entries[0].sequence_number = seq_num;
+ expected_manifest_list.entries[0].min_sequence_number = seq_num;
+ expected_manifest_list.entries[0].first_row_id = Some(10);
+ assert_eq!(manifest_list, expected_manifest_list);
+
+ temp_dir.close().unwrap();
+ }
+
#[tokio::test]
async fn test_manifest_list_writer_v1_as_v2() {
let expected_manifest_list = ManifestList {
@@ -1308,6 +1824,7 @@ mod test {
vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
),
key_metadata: None,
+ first_row_id: None,
}]
};
@@ -1331,6 +1848,100 @@ mod test {
temp_dir.close().unwrap();
}
+ #[tokio::test]
+ async fn test_manifest_list_writer_v1_as_v3() {
+ let expected_manifest_list = ManifestList {
+ entries: vec![ManifestFile {
+ manifest_path:
"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
+ manifest_length: 5806,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: 0,
+ min_sequence_number: 0,
+ added_snapshot_id: 1646658105718557341,
+ added_files_count: Some(3),
+ existing_files_count: Some(0),
+ deleted_files_count: Some(0),
+ added_rows_count: Some(3),
+ existing_rows_count: Some(0),
+ deleted_rows_count: Some(0),
+ partitions: Some(
+ vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+ ),
+ key_metadata: None,
+ first_row_id: None,
+ }]
+ };
+
+ let temp_dir = TempDir::new().unwrap();
+ let path = temp_dir.path().join("manifest_list_v1.avro");
+ let io = FileIOBuilder::new_fs_io().build().unwrap();
+ let output_file = io.new_output(path.to_str().unwrap()).unwrap();
+
+ let mut writer = ManifestListWriter::v1(output_file,
1646658105718557341, Some(0));
+ writer
+ .add_manifests(expected_manifest_list.entries.clone().into_iter())
+ .unwrap();
+ writer.close().await.unwrap();
+
+ let bs = fs::read(path).unwrap();
+
+ let manifest_list =
+ ManifestList::parse_with_version(&bs,
crate::spec::FormatVersion::V3).unwrap();
+ assert_eq!(manifest_list, expected_manifest_list);
+
+ temp_dir.close().unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_manifest_list_writer_v2_as_v3() {
+ let snapshot_id = 377075049360453639;
+ let seq_num = 1;
+ let mut expected_manifest_list = ManifestList {
+ entries: vec![ManifestFile {
+ manifest_path:
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+ manifest_length: 6926,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+ min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+ added_snapshot_id: snapshot_id,
+ added_files_count: Some(1),
+ existing_files_count: Some(0),
+ deleted_files_count: Some(0),
+ added_rows_count: Some(3),
+ existing_rows_count: Some(0),
+ deleted_rows_count: Some(0),
+ partitions: Some(
+ vec![FieldSummary { contains_null: false, contains_nan:
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+ ),
+ key_metadata: None,
+ first_row_id: None,
+ }]
+ };
+
+ let temp_dir = TempDir::new().unwrap();
+ let path = temp_dir.path().join("manifest_list_v2.avro");
+ let io = FileIOBuilder::new_fs_io().build().unwrap();
+ let output_file = io.new_output(path.to_str().unwrap()).unwrap();
+
+ let mut writer = ManifestListWriter::v2(output_file, snapshot_id,
Some(0), seq_num);
+ writer
+ .add_manifests(expected_manifest_list.entries.clone().into_iter())
+ .unwrap();
+ writer.close().await.unwrap();
+
+ let bs = fs::read(path).unwrap();
+
+ let manifest_list =
+ ManifestList::parse_with_version(&bs,
crate::spec::FormatVersion::V3).unwrap();
+ expected_manifest_list.entries[0].sequence_number = seq_num;
+ expected_manifest_list.entries[0].min_sequence_number = seq_num;
+ assert_eq!(manifest_list, expected_manifest_list);
+
+ temp_dir.close().unwrap();
+ }
+
#[tokio::test]
async fn test_manifest_list_v2_deserializer_aliases() {
// reading avro manifest file generated by iceberg 1.4.0
diff --git a/crates/iceberg/src/spec/snapshot.rs
b/crates/iceberg/src/spec/snapshot.rs
index 809bf099..5371cf68 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -21,7 +21,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use _serde::SnapshotV2;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
@@ -82,33 +81,52 @@ impl Default for Operation {
}
}
-#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
-#[serde(from = "SnapshotV2", into = "SnapshotV2")]
+#[derive(Debug, PartialEq, Eq, Clone)]
+/// Row range of a snapshot, contains first_row_id and added_rows_count.
+pub struct SnapshotRowRange {
+ /// The first _row_id assigned to the first row in the first data file in
the first manifest.
+ pub first_row_id: u64,
+ /// The upper bound of the number of rows with assigned row IDs
+ pub added_rows: u64,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
#[builder(field_defaults(setter(prefix = "with_")))]
/// A snapshot represents the state of a table at some time and is used to
access the complete set of data files in the table.
pub struct Snapshot {
/// A unique long ID
- snapshot_id: i64,
+ pub(crate) snapshot_id: i64,
/// The snapshot ID of the snapshot’s parent.
/// Omitted for any snapshot with no parent
#[builder(default = None)]
- parent_snapshot_id: Option<i64>,
+ pub(crate) parent_snapshot_id: Option<i64>,
/// A monotonically increasing long that tracks the order of
/// changes to a table.
- sequence_number: i64,
+ pub(crate) sequence_number: i64,
/// A timestamp when the snapshot was created, used for garbage
/// collection and table inspection
- timestamp_ms: i64,
+ pub(crate) timestamp_ms: i64,
/// The location of a manifest list for this snapshot that
/// tracks manifest files with additional metadata.
/// Currently we only support manifest list file, and manifest files are
not supported.
#[builder(setter(into))]
- manifest_list: String,
+ pub(crate) manifest_list: String,
/// A string map that summarizes the snapshot changes, including operation.
- summary: Summary,
+ pub(crate) summary: Summary,
/// ID of the table’s current schema when the snapshot was created.
#[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
- schema_id: Option<SchemaId>,
+ pub(crate) schema_id: Option<SchemaId>,
+ /// Encryption Key ID
+ #[builder(default)]
+ pub(crate) encryption_key_id: Option<String>,
+ /// Row range of this snapshot, required when the table version supports
row lineage.
+ /// Specify as a tuple of (first_row_id, added_rows_count)
+ #[builder(default, setter(!strip_option, transform = |first_row_id: u64,
added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
+ // This is specified as a struct instead of two separate fields to ensure
that both fields are either set or not set.
+ // The java implementations uses two separate fields, then sets
`added_row_counts` to Null if `first_row_id` is set to Null.
+ // It throws an error if `added_row_counts` is set but `first_row_id` is
not set, or if either of the two is negative.
+ // We handle all cases infallible using the rust type system.
+ pub(crate) row_range: Option<SnapshotRowRange>,
}
impl Snapshot {
@@ -205,6 +223,37 @@ impl Snapshot {
snapshot_id: self.snapshot_id,
}
}
+
+ /// The row-id of the first newly added row in this snapshot. All rows
added in this snapshot will
+ /// have a row-id assigned to them greater than this value. All rows with
a row-id less than this
+ /// value were created in a snapshot that was added to the table (but not
necessarily committed to
+ /// this branch) in the past.
+ ///
+ /// This field is optional but is required when the table version supports
row lineage.
+ pub fn first_row_id(&self) -> Option<u64> {
+ self.row_range.as_ref().map(|r| r.first_row_id)
+ }
+
+ /// The total number of newly added rows in this snapshot. It should be
the summation of {@link
+ /// ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this
snapshot.
+ ///
+ /// This field is optional but is required when the table version supports
row lineage.
+ pub fn added_rows_count(&self) -> Option<u64> {
+ self.row_range.as_ref().map(|r| r.added_rows)
+ }
+
+ /// Returns the row range of this snapshot, if available.
+ /// This is a tuple containing (first_row_id, added_rows_count).
+ pub fn row_range(&self) -> Option<(u64, u64)> {
+ self.row_range
+ .as_ref()
+ .map(|r| (r.first_row_id, r.added_rows))
+ }
+
+ /// Get encryption key id, if available.
+ pub fn encryption_key_id(&self) -> Option<&str> {
+ self.encryption_key_id.as_deref()
+ }
}
pub(super) mod _serde {
@@ -219,6 +268,26 @@ pub(super) mod _serde {
use super::{Operation, Snapshot, Summary};
use crate::Error;
use crate::spec::SchemaId;
+ use crate::spec::snapshot::SnapshotRowRange;
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 snapshot for
serialization/deserialization
+ pub(crate) struct SnapshotV3 {
+ pub snapshot_id: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub parent_snapshot_id: Option<i64>,
+ pub sequence_number: i64,
+ pub timestamp_ms: i64,
+ pub manifest_list: String,
+ pub summary: Summary,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub schema_id: Option<SchemaId>,
+ pub first_row_id: u64,
+ pub added_rows: u64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub key_id: Option<String>,
+ }
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
@@ -253,6 +322,51 @@ pub(super) mod _serde {
pub schema_id: Option<SchemaId>,
}
+ impl From<SnapshotV3> for Snapshot {
+ fn from(s: SnapshotV3) -> Self {
+ Snapshot {
+ snapshot_id: s.snapshot_id,
+ parent_snapshot_id: s.parent_snapshot_id,
+ sequence_number: s.sequence_number,
+ timestamp_ms: s.timestamp_ms,
+ manifest_list: s.manifest_list,
+ summary: s.summary,
+ schema_id: s.schema_id,
+ encryption_key_id: s.key_id,
+ row_range: Some(SnapshotRowRange {
+ first_row_id: s.first_row_id,
+ added_rows: s.added_rows,
+ }),
+ }
+ }
+ }
+
+ impl TryFrom<Snapshot> for SnapshotV3 {
+ type Error = Error;
+
+ fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
+ let row_range = s.row_range.ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "v3 Snapshots must have first-row-id and rows-added fields
set.".to_string(),
+ )
+ })?;
+
+ Ok(SnapshotV3 {
+ snapshot_id: s.snapshot_id,
+ parent_snapshot_id: s.parent_snapshot_id,
+ sequence_number: s.sequence_number,
+ timestamp_ms: s.timestamp_ms,
+ manifest_list: s.manifest_list,
+ summary: s.summary,
+ schema_id: s.schema_id,
+ first_row_id: row_range.first_row_id,
+ added_rows: row_range.added_rows,
+ key_id: s.encryption_key_id,
+ })
+ }
+ }
+
impl From<SnapshotV2> for Snapshot {
fn from(v2: SnapshotV2) -> Self {
Snapshot {
@@ -263,6 +377,8 @@ pub(super) mod _serde {
manifest_list: v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
+ encryption_key_id: None,
+ row_range: None,
}
}
}
@@ -300,6 +416,8 @@ pub(super) mod _serde {
additional_properties: HashMap::new(),
}),
schema_id: v1.schema_id,
+ encryption_key_id: None,
+ row_range: None,
})
}
}
diff --git a/crates/iceberg/src/spec/snapshot_summary.rs
b/crates/iceberg/src/spec/snapshot_summary.rs
index a9dd5699..4cd3715e 100644
--- a/crates/iceberg/src/spec/snapshot_summary.rs
+++ b/crates/iceberg/src/spec/snapshot_summary.rs
@@ -850,6 +850,7 @@ mod tests {
deleted_rows_count: Some(50),
partitions: Some(Vec::new()),
key_metadata: None,
+ first_row_id: None,
};
collector
@@ -974,6 +975,7 @@ mod tests {
deleted_rows_count: Some(0),
partitions: Some(Vec::new()),
key_metadata: None,
+ first_row_id: None,
});
summary_four.add_file(
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index a98dc4f4..437b0df5 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -38,6 +38,7 @@ use super::{
};
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
+use crate::spec::EncryptedKey;
use crate::{Error, ErrorKind};
static MAIN_BRANCH: &str = "main";
@@ -46,6 +47,10 @@ pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
+/// Initial row id for row lineage for new v3 tables and older tables
upgrading to v3.
+pub const INITIAL_ROW_ID: u64 = 0;
+/// Minimum format version that supports row lineage (v3).
+pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;
@@ -123,8 +128,10 @@ pub struct TableMetadata {
pub(crate) statistics: HashMap<i64, StatisticsFile>,
/// Mapping of snapshot ids to partition statistics files.
pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
- /// Encryption Keys
- pub(crate) encryption_keys: HashMap<String, String>,
+ /// Encryption Keys - map of key id to the actual key
+ pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
+ /// Next row id to be assigned for Row Lineage (v3)
+ pub(crate) next_row_id: u64,
}
impl TableMetadata {
@@ -398,16 +405,22 @@ impl TableMetadata {
/// Iterate over all encryption keys
#[inline]
- pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item =
(&String, &String)> {
- self.encryption_keys.iter()
+ pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item =
&EncryptedKey> {
+ self.encryption_keys.values()
}
/// Get the encryption key for a given key id
#[inline]
- pub fn encryption_key(&self, key_id: &str) -> Option<&String> {
+ pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
self.encryption_keys.get(key_id)
}
+ /// Get the next row id to be assigned
+ #[inline]
+ pub fn next_row_id(&self) -> u64 {
+ self.next_row_id
+ }
+
/// Read table metadata from the given location.
pub async fn read_from(
file_io: &FileIO,
@@ -673,16 +686,18 @@ pub(super) mod _serde {
TableMetadata,
};
use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
- use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
+ use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
use crate::spec::{
- PartitionField, PartitionSpec, PartitionSpecRef,
PartitionStatisticsFile, Schema,
- SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortOrder,
StatisticsFile,
+ EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec,
PartitionSpecRef,
+ PartitionStatisticsFile, Schema, SchemaRef, Snapshot,
SnapshotReference, SnapshotRetention,
+ SortOrder, StatisticsFile,
};
use crate::{Error, ErrorKind};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub(super) enum TableMetadataEnum {
+ V3(TableMetadataV3),
V2(TableMetadataV2),
V1(TableMetadataV1),
}
@@ -690,8 +705,21 @@ pub(super) mod _serde {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Defines the structure of a v2 table metadata for
serialization/deserialization
- pub(super) struct TableMetadataV2 {
- pub format_version: VersionNumber<2>,
+ pub(super) struct TableMetadataV3 {
+ pub format_version: VersionNumber<3>,
+ #[serde(flatten)]
+ pub shared: TableMetadataV2V3Shared,
+ pub next_row_id: u64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub encryption_keys: Option<Vec<EncryptedKey>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub snapshots: Option<Vec<SnapshotV3>>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 table metadata for
serialization/deserialization
+ pub(super) struct TableMetadataV2V3Shared {
pub table_uuid: Uuid,
pub location: String,
pub last_sequence_number: i64,
@@ -707,8 +735,6 @@ pub(super) mod _serde {
#[serde(skip_serializing_if = "Option::is_none")]
pub current_snapshot_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
- pub snapshots: Option<Vec<SnapshotV2>>,
- #[serde(skip_serializing_if = "Option::is_none")]
pub snapshot_log: Option<Vec<SnapshotLog>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata_log: Option<Vec<MetadataLog>>,
@@ -722,6 +748,17 @@ pub(super) mod _serde {
pub partition_statistics: Vec<PartitionStatisticsFile>,
}
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 table metadata for
serialization/deserialization
+ pub(super) struct TableMetadataV2 {
+ pub format_version: VersionNumber<2>,
+ #[serde(flatten)]
+ pub shared: TableMetadataV2V3Shared,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub snapshots: Option<Vec<SnapshotV2>>,
+ }
+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Defines the structure of a v1 table metadata for
serialization/deserialization
@@ -802,6 +839,7 @@ pub(super) mod _serde {
type Error = Error;
fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
match value {
+ TableMetadataEnum::V3(value) => value.try_into(),
TableMetadataEnum::V2(value) => value.try_into(),
TableMetadataEnum::V1(value) => value.try_into(),
}
@@ -812,15 +850,136 @@ pub(super) mod _serde {
type Error = Error;
fn try_from(value: TableMetadata) -> Result<Self, Error> {
Ok(match value.format_version {
+ FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
})
}
}
+ impl TryFrom<TableMetadataV3> for TableMetadata {
+ type Error = Error;
+ fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
+ let TableMetadataV3 {
+ format_version: _,
+ shared: value,
+ next_row_id,
+ encryption_keys,
+ snapshots,
+ } = value;
+ let current_snapshot_id = if let &Some(-1) =
&value.current_snapshot_id {
+ None
+ } else {
+ value.current_snapshot_id
+ };
+ let schemas = HashMap::from_iter(
+ value
+ .schemas
+ .into_iter()
+ .map(|schema| Ok((schema.schema_id,
Arc::new(schema.try_into()?))))
+ .collect::<Result<Vec<_>, Error>>()?,
+ );
+
+ let current_schema: &SchemaRef =
+ schemas.get(&value.current_schema_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No schema exists with the current schema id {}.",
+ value.current_schema_id
+ ),
+ )
+ })?;
+ let partition_specs = HashMap::from_iter(
+ value
+ .partition_specs
+ .into_iter()
+ .map(|x| (x.spec_id(), Arc::new(x))),
+ );
+ let default_spec_id = value.default_spec_id;
+ let default_spec: PartitionSpecRef = partition_specs
+ .get(&value.default_spec_id)
+ .map(|spec| (**spec).clone())
+ .or_else(|| {
+ (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
+ .then(PartitionSpec::unpartition_spec)
+ })
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Default partition spec {default_spec_id} not
found"),
+ )
+ })?
+ .into();
+ let default_partition_type =
default_spec.partition_type(current_schema)?;
+
+ let mut metadata = TableMetadata {
+ format_version: FormatVersion::V3,
+ table_uuid: value.table_uuid,
+ location: value.location,
+ last_sequence_number: value.last_sequence_number,
+ last_updated_ms: value.last_updated_ms,
+ last_column_id: value.last_column_id,
+ current_schema_id: value.current_schema_id,
+ schemas,
+ partition_specs,
+ default_partition_type,
+ default_spec,
+ last_partition_id: value.last_partition_id,
+ properties: value.properties.unwrap_or_default(),
+ current_snapshot_id,
+ snapshots: snapshots
+ .map(|snapshots| {
+ HashMap::from_iter(
+ snapshots
+ .into_iter()
+ .map(|x| (x.snapshot_id, Arc::new(x.into()))),
+ )
+ })
+ .unwrap_or_default(),
+ snapshot_log: value.snapshot_log.unwrap_or_default(),
+ metadata_log: value.metadata_log.unwrap_or_default(),
+ sort_orders: HashMap::from_iter(
+ value
+ .sort_orders
+ .into_iter()
+ .map(|x| (x.order_id, Arc::new(x))),
+ ),
+ default_sort_order_id: value.default_sort_order_id,
+ refs: value.refs.unwrap_or_else(|| {
+ if let Some(snapshot_id) = current_snapshot_id {
+ HashMap::from_iter(vec![(MAIN_BRANCH.to_string(),
SnapshotReference {
+ snapshot_id,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ })])
+ } else {
+ HashMap::new()
+ }
+ }),
+ statistics: index_statistics(value.statistics),
+ partition_statistics:
index_partition_statistics(value.partition_statistics),
+ encryption_keys: encryption_keys
+ .map(|keys| {
+ HashMap::from_iter(keys.into_iter().map(|key|
(key.key_id.clone(), key)))
+ })
+ .unwrap_or_default(),
+ next_row_id,
+ };
+
+ metadata.borrow_mut().try_normalize()?;
+ Ok(metadata)
+ }
+ }
+
impl TryFrom<TableMetadataV2> for TableMetadata {
type Error = Error;
fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
+ let snapshots = value.snapshots;
+ let value = value.shared;
let current_snapshot_id = if let &Some(-1) =
&value.current_snapshot_id {
None
} else {
@@ -882,8 +1041,7 @@ pub(super) mod _serde {
last_partition_id: value.last_partition_id,
properties: value.properties.unwrap_or_default(),
current_snapshot_id,
- snapshots: value
- .snapshots
+ snapshots: snapshots
.map(|snapshots| {
HashMap::from_iter(
snapshots
@@ -918,6 +1076,7 @@ pub(super) mod _serde {
statistics: index_statistics(value.statistics),
partition_statistics:
index_partition_statistics(value.partition_statistics),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
metadata.borrow_mut().try_normalize()?;
@@ -1072,6 +1231,7 @@ pub(super) mod _serde {
statistics: index_statistics(value.statistics),
partition_statistics:
index_partition_statistics(value.partition_statistics),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
};
metadata.borrow_mut().try_normalize()?;
@@ -1079,10 +1239,63 @@ pub(super) mod _serde {
}
}
+ impl TryFrom<TableMetadata> for TableMetadataV3 {
+ type Error = Error;
+
+ fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
+ let next_row_id = v.next_row_id;
+ let encryption_keys = std::mem::take(&mut v.encryption_keys);
+ let snapshots = std::mem::take(&mut v.snapshots);
+ let shared = v.into();
+
+ Ok(TableMetadataV3 {
+ format_version: VersionNumber::<3>,
+ shared,
+ next_row_id,
+ encryption_keys: if encryption_keys.is_empty() {
+ None
+ } else {
+ Some(encryption_keys.into_values().collect())
+ },
+ snapshots: if snapshots.is_empty() {
+ None
+ } else {
+ Some(
+ snapshots
+ .into_values()
+ .map(|s|
SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
+ .collect::<Result<_, _>>()?,
+ )
+ },
+ })
+ }
+ }
+
impl From<TableMetadata> for TableMetadataV2 {
- fn from(v: TableMetadata) -> Self {
+ fn from(mut v: TableMetadata) -> Self {
+ let snapshots = std::mem::take(&mut v.snapshots);
+ let shared = v.into();
+
TableMetadataV2 {
format_version: VersionNumber::<2>,
+ shared,
+ snapshots: if snapshots.is_empty() {
+ None
+ } else {
+ Some(
+ snapshots
+ .into_values()
+ .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
+ .collect(),
+ )
+ },
+ }
+ }
+ }
+
+ impl From<TableMetadata> for TableMetadataV2V3Shared {
+ fn from(v: TableMetadata) -> Self {
+ TableMetadataV2V3Shared {
table_uuid: v.table_uuid,
location: v.location,
last_sequence_number: v.last_sequence_number,
@@ -1111,20 +1324,6 @@ pub(super) mod _serde {
Some(v.properties)
},
current_snapshot_id: v.current_snapshot_id,
- snapshots: if v.snapshots.is_empty() {
- None
- } else {
- Some(
- v.snapshots
- .into_values()
- .map(|x| {
- Arc::try_unwrap(x)
- .unwrap_or_else(|snapshot|
snapshot.as_ref().clone())
- .into()
- })
- .collect(),
- )
- },
snapshot_log: if v.snapshot_log.is_empty() {
None
} else {
@@ -1254,6 +1453,8 @@ pub enum FormatVersion {
V1 = 1u8,
/// Iceberg spec version 2
V2 = 2u8,
+ /// Iceberg spec version 3
+ V3 = 3u8,
}
impl PartialOrd for FormatVersion {
@@ -1273,6 +1474,7 @@ impl Display for FormatVersion {
match self {
FormatVersion::V1 => write!(f, "v1"),
FormatVersion::V2 => write!(f, "v2"),
+ FormatVersion::V3 => write!(f, "v3"),
}
}
}
@@ -1317,6 +1519,7 @@ mod tests {
use std::sync::Arc;
use anyhow::Result;
+ use base64::Engine as _;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use uuid::Uuid;
@@ -1326,9 +1529,10 @@ mod tests {
use crate::io::FileIOBuilder;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
- BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec,
PartitionStatisticsFile,
- PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection,
- SortField, SortOrder, StatisticsFile, Summary, Transform, Type,
UnboundPartitionField,
+ BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField,
NullOrder, Operation,
+ PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral,
PrimitiveType, Schema, Snapshot,
+ SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, StatisticsFile,
+ Summary, Transform, Type, UnboundPartitionField,
};
fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
@@ -1474,6 +1678,183 @@ mod tests {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
+ };
+
+ let expected_json_value = serde_json::to_value(&expected).unwrap();
+ check_table_metadata_serde(data, expected);
+
+ let json_value =
serde_json::from_str::<serde_json::Value>(data).unwrap();
+ assert_eq!(json_value, expected_json_value);
+ }
+
+ #[test]
+ fn test_table_data_v3() {
+ let data = r#"
+ {
+ "format-version" : 3,
+ "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
+ "location": "s3://b/wh/data.db/table",
+ "last-sequence-number" : 1,
+ "last-updated-ms": 1515100955770,
+ "last-column-id": 1,
+ "next-row-id": 5,
+ "schemas": [
+ {
+ "schema-id" : 1,
+ "type" : "struct",
+ "fields" :[
+ {
+ "id": 4,
+ "name": "ts",
+ "required": true,
+ "type": "timestamp"
+ }
+ ]
+ }
+ ],
+ "current-schema-id" : 1,
+ "partition-specs": [
+ {
+ "spec-id": 0,
+ "fields": [
+ {
+ "source-id": 4,
+ "field-id": 1000,
+ "name": "ts_day",
+ "transform": "day"
+ }
+ ]
+ }
+ ],
+ "default-spec-id": 0,
+ "last-partition-id": 1000,
+ "properties": {
+ "commit.retry.num-retries": "1"
+ },
+ "metadata-log": [
+ {
+ "metadata-file": "s3://bucket/.../v1.json",
+ "timestamp-ms": 1515100
+ }
+ ],
+ "refs": {},
+ "snapshots" : [ {
+ "snapshot-id" : 1,
+ "timestamp-ms" : 1662532818843,
+ "sequence-number" : 0,
+ "first-row-id" : 0,
+ "added-rows" : 4,
+ "key-id" : "key1",
+ "summary" : {
+ "operation" : "append"
+ },
+ "manifest-list" :
"/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
+ "schema-id" : 0
+ }
+ ],
+ "encryption-keys": [
+ {
+ "key-id": "key1",
+ "encrypted-by-id": "KMS",
+ "encrypted-key-metadata":
"c29tZS1lbmNyeXB0aW9uLWtleQ==",
+ "properties": {
+ "p1": "v1"
+ }
+ }
+ ],
+ "sort-orders": [
+ {
+ "order-id": 0,
+ "fields": []
+ }
+ ],
+ "default-sort-order-id": 0
+ }
+ "#;
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![Arc::new(NestedField::required(
+ 4,
+ "ts",
+ Type::Primitive(PrimitiveType::Timestamp),
+ ))])
+ .build()
+ .unwrap();
+
+ let partition_spec = PartitionSpec::builder(schema.clone())
+ .with_spec_id(0)
+ .add_unbound_field(UnboundPartitionField {
+ name: "ts_day".to_string(),
+ transform: Transform::Day,
+ source_id: 4,
+ field_id: Some(1000),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(1662532818843)
+ .with_sequence_number(0)
+ .with_row_range(0, 4)
+ .with_encryption_key_id(Some("key1".to_string()))
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
+ .with_schema_id(0)
+ .build();
+
+ let encryption_key = EncryptedKey::builder()
+ .key_id("key1".to_string())
+ .encrypted_by_id("KMS".to_string())
+ .encrypted_key_metadata(
+ base64::prelude::BASE64_STANDARD
+ .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
+ .unwrap(),
+ )
+ .properties(HashMap::from_iter(vec![(
+ "p1".to_string(),
+ "v1".to_string(),
+ )]))
+ .build();
+
+ let default_partition_type =
partition_spec.partition_type(&schema).unwrap();
+ let expected = TableMetadata {
+ format_version: FormatVersion::V3,
+ table_uuid:
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
+ location: "s3://b/wh/data.db/table".to_string(),
+ last_updated_ms: 1515100955770,
+ last_column_id: 1,
+ schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+ current_schema_id: 1,
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.clone().into())]),
+ default_partition_type,
+ default_spec: partition_spec.into(),
+ last_partition_id: 1000,
+ default_sort_order_id: 0,
+ sort_orders: HashMap::from_iter(vec![(0,
SortOrder::unsorted_order().into())]),
+ snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
+ current_snapshot_id: None,
+ last_sequence_number: 1,
+ properties: HashMap::from_iter(vec![(
+ "commit.retry.num-retries".to_string(),
+ "1".to_string(),
+ )]),
+ snapshot_log: Vec::new(),
+ metadata_log: vec![MetadataLog {
+ metadata_file: "s3://bucket/.../v1.json".to_string(),
+ timestamp_ms: 1515100,
+ }],
+ refs: HashMap::new(),
+ statistics: HashMap::new(),
+ partition_statistics: HashMap::new(),
+ encryption_keys: HashMap::from_iter(vec![("key1".to_string(),
encryption_key)]),
+ next_row_id: 5,
};
let expected_json_value = serde_json::to_value(&expected).unwrap();
@@ -1650,6 +2031,7 @@ mod tests {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
check_table_metadata_serde(data, expected);
@@ -1748,6 +2130,7 @@ mod tests {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
let expected_json_value = serde_json::to_value(&expected).unwrap();
@@ -2282,6 +2665,7 @@ mod tests {
},
})]),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
check_table_metadata_serde(data, expected);
@@ -2417,6 +2801,7 @@ mod tests {
},
})]),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
check_table_metadata_serde(data, expected);
@@ -2445,6 +2830,95 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_table_metadata_v3_valid_minimal() {
+ let metadata_str =
+
fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
+
+ let table_metadata =
serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
+ assert_eq!(table_metadata.format_version, FormatVersion::V3);
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ Arc::new(
+ NestedField::required(1, "x",
Type::Primitive(PrimitiveType::Long))
+
.with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
+
.with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
+ ),
+ Arc::new(
+ NestedField::required(2, "y",
Type::Primitive(PrimitiveType::Long))
+ .with_doc("comment"),
+ ),
+ Arc::new(NestedField::required(
+ 3,
+ "z",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec = PartitionSpec::builder(schema.clone())
+ .with_spec_id(0)
+ .add_unbound_field(UnboundPartitionField {
+ name: "x".to_string(),
+ transform: Transform::Identity,
+ source_id: 1,
+ field_id: Some(1000),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let sort_order = SortOrder::builder()
+ .with_order_id(3)
+ .with_sort_field(SortField {
+ source_id: 2,
+ transform: Transform::Identity,
+ direction: SortDirection::Ascending,
+ null_order: NullOrder::First,
+ })
+ .with_sort_field(SortField {
+ source_id: 3,
+ transform: Transform::Bucket(4),
+ direction: SortDirection::Descending,
+ null_order: NullOrder::Last,
+ })
+ .build_unbound()
+ .unwrap();
+
+ let default_partition_type =
partition_spec.partition_type(&schema).unwrap();
+ let expected = TableMetadata {
+ format_version: FormatVersion::V3,
+ table_uuid:
Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
+ location: "s3://bucket/test/location".to_string(),
+ last_updated_ms: 1602638573590,
+ last_column_id: 3,
+ schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
+ current_schema_id: 0,
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.clone().into())]),
+ default_spec: Arc::new(partition_spec),
+ default_partition_type,
+ last_partition_id: 1000,
+ default_sort_order_id: 3,
+ sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
+ snapshots: HashMap::default(),
+ current_snapshot_id: None,
+ last_sequence_number: 34,
+ properties: HashMap::new(),
+ snapshot_log: Vec::new(),
+ metadata_log: Vec::new(),
+ refs: HashMap::new(),
+ statistics: HashMap::new(),
+ partition_statistics: HashMap::new(),
+ encryption_keys: HashMap::new(),
+ next_row_id: 0, // V3 specific field from the JSON
+ };
+
+ check_table_metadata_serde(&metadata_str, expected);
+ }
+
#[test]
fn test_table_metadata_v2_file_valid() {
let metadata =
@@ -2579,6 +3053,7 @@ mod tests {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
check_table_metadata_serde(&metadata, expected);
@@ -2664,6 +3139,7 @@ mod tests {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
check_table_metadata_serde(&metadata, expected);
@@ -2733,6 +3209,7 @@ mod tests {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
};
check_table_metadata_serde(&metadata, expected);
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 25af8c30..6b8ce1e6 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -28,6 +28,7 @@ use super::{
UnboundPartitionSpec,
};
use crate::error::{Error, ErrorKind, Result};
+use crate::spec::{EncryptedKey, INITIAL_ROW_ID,
MIN_FORMAT_VERSION_ROW_LINEAGE};
use crate::{TableCreation, TableUpdate};
const FIRST_FIELD_ID: u32 = 1;
@@ -120,6 +121,7 @@ impl TableMetadataBuilder {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: INITIAL_ROW_ID,
},
last_updated_ms: None,
changes: vec![],
@@ -170,6 +172,7 @@ impl TableMetadataBuilder {
partition_spec,
sort_order,
properties,
+ format_version,
} = table_creation;
let location = location.ok_or_else(|| {
@@ -188,7 +191,7 @@ impl TableMetadataBuilder {
partition_spec,
sort_order.unwrap_or(SortOrder::unsorted_order()),
location,
- FormatVersion::V2,
+ format_version,
properties,
)
}
@@ -228,6 +231,11 @@ impl TableMetadataBuilder {
self.changes
.push(TableUpdate::UpgradeFormatVersion {
format_version });
}
+ FormatVersion::V3 => {
+ self.metadata.format_version = format_version;
+ self.changes
+ .push(TableUpdate::UpgradeFormatVersion {
format_version });
+ }
}
}
@@ -329,6 +337,9 @@ impl TableMetadataBuilder {
/// # Errors
/// - Snapshot id already exists.
/// - For format version > 1: the sequence number of the snapshot is lower
than the highest sequence number specified so far.
+ /// - For format version >= 3: the first-row-id of the snapshot is lower
than the next-row-id of the table.
+ /// - For format version >= 3: added-rows is null or first-row-id is null.
+ /// - For format version >= 3: next-row-id would overflow when adding
added-rows.
pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
if self
.metadata
@@ -385,6 +396,43 @@ impl TableMetadataBuilder {
));
}
+ let mut added_rows = None;
+ if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
+ if let Some((first_row_id, added_rows_count)) =
snapshot.row_range() {
+ if first_row_id < self.metadata.next_row_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add a snapshot, first-row-id is behind
table next-row-id: {first_row_id} < {}",
+ self.metadata.next_row_id
+ ),
+ ));
+ }
+
+ added_rows = Some(added_rows_count);
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add a snapshot: first-row-id is null.
first-row-id must be set for format version >=
{MIN_FORMAT_VERSION_ROW_LINEAGE}",
+ ),
+ ));
+ }
+ }
+
+ if let Some(added_rows) = added_rows {
+ self.metadata.next_row_id = self
+ .metadata
+ .next_row_id
+ .checked_add(added_rows)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot add snapshot: next-row-id overflowed when
adding added-rows",
+ )
+ })?;
+ }
+
// Mutation happens in next line - must be infallible from here
self.changes.push(TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
@@ -1014,6 +1062,31 @@ impl TableMetadataBuilder {
.set_default_sort_order(Self::LAST_ADDED as i64)
}
+ /// Add an encryption key to the table metadata.
+ pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
+ let key_id = key.key_id().to_string();
+ if self.metadata.encryption_keys.contains_key(&key_id) {
+ // already exists
+ return self;
+ }
+
+ self.metadata.encryption_keys.insert(key_id, key.clone());
+ self.changes.push(TableUpdate::AddEncryptionKey {
+ encryption_key: key,
+ });
+ self
+ }
+
+ /// Remove an encryption key from the table metadata.
+ pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
+ if self.metadata.encryption_keys.remove(key_id).is_some() {
+ self.changes.push(TableUpdate::RemoveEncryptionKey {
+ key_id: key_id.to_string(),
+ });
+ }
+ self
+ }
+
/// Build the table metadata.
pub fn build(mut self) -> Result<TableMetadataBuildResult> {
self.metadata.last_updated_ms = self
@@ -2988,4 +3061,382 @@ mod tests {
assert!(result.is_ok());
}
+
+ #[test]
+ fn test_row_lineage_addition() {
+ let new_rows = 30;
+ let base = builder_without_changes(FormatVersion::V3)
+ .build()
+ .unwrap()
+ .metadata;
+ let add_rows = Snapshot::builder()
+ .with_snapshot_id(0)
+ .with_timestamp_ms(base.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("foo")
+ .with_parent_snapshot_id(None)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .with_row_range(base.next_row_id(), new_rows)
+ .build();
+
+ let first_addition = base
+ .into_builder(None)
+ .add_snapshot(add_rows.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ assert_eq!(first_addition.next_row_id(), new_rows);
+
+ let add_more_rows = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(first_addition.last_updated_ms + 1)
+ .with_sequence_number(1)
+ .with_schema_id(0)
+ .with_manifest_list("foo")
+ .with_parent_snapshot_id(Some(0))
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .with_row_range(first_addition.next_row_id(), new_rows)
+ .build();
+
+ let second_addition = first_addition
+ .into_builder(None)
+ .add_snapshot(add_more_rows)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+ assert_eq!(second_addition.next_row_id(), new_rows * 2);
+ }
+
+ #[test]
+ fn test_row_lineage_invalid_snapshot() {
+ let new_rows = 30;
+ let base = builder_without_changes(FormatVersion::V3)
+ .build()
+ .unwrap()
+ .metadata;
+
+ // add rows to check TableMetadata validation; Snapshot rejects
negative next-row-id
+ let add_rows = Snapshot::builder()
+ .with_snapshot_id(0)
+ .with_timestamp_ms(base.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("foo")
+ .with_parent_snapshot_id(None)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .with_row_range(base.next_row_id(), new_rows)
+ .build();
+
+ let added = base
+ .into_builder(None)
+ .add_snapshot(add_rows)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ let invalid_new_rows = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(added.last_updated_ms + 1)
+ .with_sequence_number(1)
+ .with_schema_id(0)
+ .with_manifest_list("foo")
+ .with_parent_snapshot_id(Some(0))
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ // first_row_id is behind table next_row_id
+ .with_row_range(added.next_row_id() - 1, 10)
+ .build();
+
+ let err = added
+ .into_builder(None)
+ .add_snapshot(invalid_new_rows)
+ .unwrap_err();
+ assert!(
+ err.to_string().contains(
+ "Cannot add a snapshot, first-row-id is behind table
next-row-id: 29 < 30"
+ )
+ );
+ }
+
+ #[test]
+ fn test_row_lineage_append_branch() {
+ // Appends to a branch should still change last-row-id even if not on
main, these changes
+ // should also affect commits to main
+
+ let branch = "some_branch";
+
+ // Start with V3 metadata to support row lineage
+ let base = builder_without_changes(FormatVersion::V3)
+ .build()
+ .unwrap()
+ .metadata;
+
+ // Initial next_row_id should be 0
+ assert_eq!(base.next_row_id(), 0);
+
+ // Write to Branch - append 30 rows
+ let branch_snapshot_1 = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(base.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("foo")
+ .with_parent_snapshot_id(None)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .with_row_range(base.next_row_id(), 30)
+ .build();
+
+ let table_after_branch_1 = base
+ .into_builder(None)
+ .set_branch_snapshot(branch_snapshot_1.clone(), branch)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ // Current snapshot should be null (no main branch snapshot yet)
+ assert!(table_after_branch_1.current_snapshot().is_none());
+
+ // Branch snapshot should have first_row_id = 0
+ let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
+ let branch_snap_1 = table_after_branch_1
+ .snapshots
+ .get(&branch_ref.snapshot_id)
+ .unwrap();
+ assert_eq!(branch_snap_1.first_row_id(), Some(0));
+
+ // Next row id should be 30
+ assert_eq!(table_after_branch_1.next_row_id(), 30);
+
+ // Write to Main - append 28 rows
+ let main_snapshot = Snapshot::builder()
+ .with_snapshot_id(2)
+ .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
+ .with_sequence_number(1)
+ .with_schema_id(0)
+ .with_manifest_list("bar")
+ .with_parent_snapshot_id(None)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .with_row_range(table_after_branch_1.next_row_id(), 28)
+ .build();
+
+ let table_after_main = table_after_branch_1
+ .into_builder(None)
+ .add_snapshot(main_snapshot.clone())
+ .unwrap()
+ .set_ref(MAIN_BRANCH, SnapshotReference {
+ snapshot_id: main_snapshot.snapshot_id(),
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ })
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ // Main snapshot should have first_row_id = 30
+ let current_snapshot = table_after_main.current_snapshot().unwrap();
+ assert_eq!(current_snapshot.first_row_id(), Some(30));
+
+ // Next row id should be 58 (30 + 28)
+ assert_eq!(table_after_main.next_row_id(), 58);
+
+ // Write again to branch - append 21 rows
+ let branch_snapshot_2 = Snapshot::builder()
+ .with_snapshot_id(3)
+ .with_timestamp_ms(table_after_main.last_updated_ms + 1)
+ .with_sequence_number(2)
+ .with_schema_id(0)
+ .with_manifest_list("baz")
+ .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
+ .with_summary(Summary {
+ operation: Operation::Append,
+ additional_properties: HashMap::new(),
+ })
+ .with_row_range(table_after_main.next_row_id(), 21)
+ .build();
+
+ let table_after_branch_2 = table_after_main
+ .into_builder(None)
+ .set_branch_snapshot(branch_snapshot_2.clone(), branch)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ // Branch snapshot should have first_row_id = 58 (30 + 28)
+ let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
+ let branch_snap_2 = table_after_branch_2
+ .snapshots
+ .get(&branch_ref_2.snapshot_id)
+ .unwrap();
+ assert_eq!(branch_snap_2.first_row_id(), Some(58));
+
+ // Next row id should be 79 (30 + 28 + 21)
+ assert_eq!(table_after_branch_2.next_row_id(), 79);
+ }
+
+ #[test]
+ fn test_encryption_keys() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ // Create test encryption keys
+ let encryption_key_1 = EncryptedKey::builder()
+ .key_id("key-1")
+ .encrypted_key_metadata(vec![1, 2, 3, 4])
+ .encrypted_by_id("encryption-service-1")
+ .properties(HashMap::from_iter(vec![(
+ "algorithm".to_string(),
+ "AES-256".to_string(),
+ )]))
+ .build();
+
+ let encryption_key_2 = EncryptedKey::builder()
+ .key_id("key-2")
+ .encrypted_key_metadata(vec![5, 6, 7, 8])
+ .encrypted_by_id("encryption-service-2")
+ .properties(HashMap::new())
+ .build();
+
+ // Add first encryption key
+ let build_result = builder
+ .add_encryption_key(encryption_key_1.clone())
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(build_result.metadata.encryption_keys.len(), 1);
+ assert_eq!(
+ build_result.metadata.encryption_key("key-1"),
+ Some(&encryption_key_1)
+ );
+ assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
+ encryption_key: encryption_key_1.clone()
+ });
+
+ // Add second encryption key
+ let build_result = build_result
+ .metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata1.json".to_string(),
+ ))
+ .add_encryption_key(encryption_key_2.clone())
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(build_result.metadata.encryption_keys.len(), 2);
+ assert_eq!(
+ build_result.metadata.encryption_key("key-1"),
+ Some(&encryption_key_1)
+ );
+ assert_eq!(
+ build_result.metadata.encryption_key("key-2"),
+ Some(&encryption_key_2)
+ );
+ assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
+ encryption_key: encryption_key_2.clone()
+ });
+
+ // Try to add duplicate key - should not create a change
+ let build_result = build_result
+ .metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata2.json".to_string(),
+ ))
+ .add_encryption_key(encryption_key_1.clone())
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 0);
+ assert_eq!(build_result.metadata.encryption_keys.len(), 2);
+
+ // Remove first encryption key
+ let build_result = build_result
+ .metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata3.json".to_string(),
+ ))
+ .remove_encryption_key("key-1")
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(build_result.metadata.encryption_keys.len(), 1);
+ assert_eq!(build_result.metadata.encryption_key("key-1"), None);
+ assert_eq!(
+ build_result.metadata.encryption_key("key-2"),
+ Some(&encryption_key_2)
+ );
+ assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
+ key_id: "key-1".to_string()
+ });
+
+ // Try to remove non-existent key - should not create a change
+ let build_result = build_result
+ .metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata4.json".to_string(),
+ ))
+ .remove_encryption_key("non-existent-key")
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 0);
+ assert_eq!(build_result.metadata.encryption_keys.len(), 1);
+
+ // Test encryption_keys_iter()
+ let keys = build_result
+ .metadata
+ .encryption_keys_iter()
+ .collect::<Vec<_>>();
+ assert_eq!(keys.len(), 1);
+ assert_eq!(keys[0], &encryption_key_2);
+
+ // Remove last encryption key
+ let build_result = build_result
+ .metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata5.json".to_string(),
+ ))
+ .remove_encryption_key("key-2")
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(build_result.metadata.encryption_keys.len(), 0);
+ assert_eq!(build_result.metadata.encryption_key("key-2"), None);
+ assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
+ key_id: "key-2".to_string()
+ });
+
+ // Verify empty encryption_keys_iter()
+ let keys = build_result.metadata.encryption_keys_iter();
+ assert_eq!(keys.len(), 0);
+ }
}
diff --git a/crates/iceberg/src/transaction/mod.rs
b/crates/iceberg/src/transaction/mod.rs
index 26bd6522..4116264a 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -243,7 +243,7 @@ mod tests {
use crate::spec::TableMetadata;
use crate::table::Table;
use crate::transaction::{ApplyTransactionAction, Transaction};
- use crate::{Error, ErrorKind, TableIdent};
+ use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
pub fn make_v1_table() -> Table {
let file = File::open(format!(
@@ -302,8 +302,41 @@ mod tests {
.unwrap()
}
+ pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl
Catalog) -> Table {
+ let table_ident =
+ TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()),
"test1".to_string()])
+ .unwrap();
+
+ catalog
+ .create_namespace(table_ident.namespace(), HashMap::new())
+ .await
+ .unwrap();
+
+ let file = File::open(format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV3ValidMinimal.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let base_metadata = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
+
+ let table_creation = TableCreation::builder()
+ .schema((**base_metadata.current_schema()).clone())
+ .partition_spec((**base_metadata.default_partition_spec()).clone())
+ .sort_order((**base_metadata.default_sort_order()).clone())
+ .name(table_ident.name().to_string())
+ .format_version(crate::spec::FormatVersion::V3)
+ .build();
+
+ catalog
+ .create_table(table_ident.namespace(), table_creation)
+ .await
+ .unwrap()
+ }
+
/// Helper function to create a test table with retry properties
- fn setup_test_table(num_retries: &str) -> Table {
+ pub(super) fn setup_test_table(num_retries: &str) -> Table {
let table = make_v2_table();
// Set retry properties
@@ -469,3 +502,88 @@ mod tests {
}
}
}
+
+#[cfg(test)]
+mod test_row_lineage {
+ use crate::memory::tests::new_memory_catalog;
+ use crate::spec::{
+ DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal,
Struct,
+ };
+ use crate::transaction::tests::make_v3_minimal_table_in_catalog;
+ use crate::transaction::{ApplyTransactionAction, Transaction};
+
+ #[tokio::test]
+ async fn test_fast_append_with_row_lineage() {
+ // Helper function to create a data file with specified number of rows
+ fn file_with_rows(record_count: u64) -> DataFile {
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!("test/{}.parquet", record_count))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(record_count)
+ .partition(Struct::from_iter([Some(Literal::long(0))]))
+ .partition_spec_id(0)
+ .build()
+ .unwrap()
+ }
+ let catalog = new_memory_catalog().await;
+
+ let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+ // Check initial state - next_row_id should be 0
+ assert_eq!(table.metadata().next_row_id(), 0);
+
+ // First fast append with 30 rows
+ let tx = Transaction::new(&table);
+ let data_file_30 = file_with_rows(30);
+ let action = tx.fast_append().add_data_files(vec![data_file_30]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ // Check snapshot and table state after first append
+ let snapshot = table.metadata().current_snapshot().unwrap();
+ assert_eq!(snapshot.first_row_id(), Some(0));
+ assert_eq!(table.metadata().next_row_id(), 30);
+
+ // Check written manifest for first_row_id
+ let manifest_list = table
+ .metadata()
+ .current_snapshot()
+ .unwrap()
+ .load_manifest_list(table.file_io(), table.metadata())
+ .await
+ .unwrap();
+
+ assert_eq!(manifest_list.entries().len(), 1);
+ let manifest_file = &manifest_list.entries()[0];
+ assert_eq!(manifest_file.first_row_id, Some(0));
+
+ // Second fast append with 17 and 11 rows
+ let tx = Transaction::new(&table);
+ let data_file_17 = file_with_rows(17);
+ let data_file_11 = file_with_rows(11);
+ let action = tx
+ .fast_append()
+ .add_data_files(vec![data_file_17, data_file_11]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ // Check snapshot and table state after second append
+ let snapshot = table.metadata().current_snapshot().unwrap();
+ assert_eq!(snapshot.first_row_id(), Some(30));
+ assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
+
+ // Check written manifest for first_row_id
+ let manifest_list = table
+ .metadata()
+ .current_snapshot()
+ .unwrap()
+ .load_manifest_list(table.file_io(), table.metadata())
+ .await
+ .unwrap();
+ assert_eq!(manifest_list.entries().len(), 2);
+ let manifest_file = &manifest_list.entries()[1];
+ assert_eq!(manifest_file.first_row_id, Some(30));
+ }
+}
diff --git a/crates/iceberg/src/transaction/snapshot.rs
b/crates/iceberg/src/transaction/snapshot.rs
index 93dd819d..4f85962f 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -206,13 +206,16 @@ impl<'a> SnapshotProducer<'a> {
.as_ref()
.clone(),
);
- if self.table.metadata().format_version() == FormatVersion::V1 {
- Ok(builder.build_v1())
- } else {
- match content {
+ match self.table.metadata().format_version() {
+ FormatVersion::V1 => Ok(builder.build_v1()),
+ FormatVersion::V2 => match content {
ManifestContentType::Data => Ok(builder.build_v2_data()),
ManifestContentType::Deletes => Ok(builder.build_v2_deletes()),
- }
+ },
+ FormatVersion::V3 => match content {
+ ManifestContentType::Data => Ok(builder.build_v3_data()),
+ ManifestContentType::Deletes => Ok(builder.build_v3_deletes()),
+ },
}
}
@@ -382,6 +385,7 @@ impl<'a> SnapshotProducer<'a> {
) -> Result<ActionCommit> {
let manifest_list_path = self.generate_manifest_list_file_path(0);
let next_seq_num = self.table.metadata().next_sequence_number();
+ let first_row_id = self.table.metadata().next_row_id();
let mut manifest_list_writer = match
self.table.metadata().format_version() {
FormatVersion::V1 => ManifestListWriter::v1(
self.table
@@ -398,6 +402,15 @@ impl<'a> SnapshotProducer<'a> {
self.table.metadata().current_snapshot_id(),
next_seq_num,
),
+ FormatVersion::V3 => ManifestListWriter::v3(
+ self.table
+ .file_io()
+ .new_output(manifest_list_path.clone())?,
+ self.snapshot_id,
+ self.table.metadata().current_snapshot_id(),
+ next_seq_num,
+ Some(first_row_id),
+ ),
};
// Calling self.summary() before self.manifest_file() is important
because self.added_data_files
@@ -412,6 +425,7 @@ impl<'a> SnapshotProducer<'a> {
.await?;
manifest_list_writer.add_manifests(new_manifests.into_iter())?;
+ let writer_next_row_id = manifest_list_writer.next_row_id();
manifest_list_writer.close().await?;
let commit_ts = chrono::Utc::now().timestamp_millis();
@@ -422,8 +436,16 @@ impl<'a> SnapshotProducer<'a> {
.with_sequence_number(next_seq_num)
.with_summary(summary)
.with_schema_id(self.table.metadata().current_schema_id())
- .with_timestamp_ms(commit_ts)
- .build();
+ .with_timestamp_ms(commit_ts);
+
+ let new_snapshot = if let Some(writer_next_row_id) =
writer_next_row_id {
+ let assigned_rows = writer_next_row_id -
self.table.metadata().next_row_id();
+ new_snapshot
+ .with_row_range(first_row_id, assigned_rows)
+ .build()
+ } else {
+ new_snapshot.build()
+ };
let updates = vec![
TableUpdate::AddSnapshot {
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs
b/crates/iceberg/src/writer/file_writer/location_generator.rs
index 4cfc2784..a5cfc282 100644
--- a/crates/iceberg/src/writer/file_writer/location_generator.rs
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -183,6 +183,7 @@ pub(crate) mod test {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: 0,
};
let file_name_generator = super::DefaultFileNameGenerator::new(
@@ -297,6 +298,7 @@ pub(crate) mod test {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
encryption_keys: HashMap::new(),
+ next_row_id: 0,
};
// Test with DefaultLocationGenerator
diff --git
a/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json
b/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json
new file mode 100644
index 00000000..bf85114c
--- /dev/null
+++ b/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json
@@ -0,0 +1,74 @@
+{
+ "format-version": 3,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 34,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 3,
+ "current-schema-id": 0,
+ "next-row-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long",
+ "initial-default": 1,
+ "write-default": 1
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long",
+ "doc": "comment"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ]
+ }
+ ],
+ "default-spec-id": 0,
+ "partition-specs": [
+ {
+ "spec-id": 0,
+ "fields": [
+ {
+ "name": "x",
+ "transform": "identity",
+ "source-id": 1,
+ "field-id": 1000
+ }
+ ]
+ }
+ ],
+ "last-partition-id": 1000,
+ "default-sort-order-id": 3,
+ "sort-orders": [
+ {
+ "order-id": 3,
+ "fields": [
+ {
+ "transform": "identity",
+ "source-id": 2,
+ "direction": "asc",
+ "null-order": "nulls-first"
+ },
+ {
+ "transform": "bucket[4]",
+ "source-id": 3,
+ "direction": "desc",
+ "null-order": "nulls-last"
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file