This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d3b3ab10 fix: TableUpdate Snapshot deserialization for v1 (#656)
d3b3ab10 is described below
commit d3b3ab102630845ede1b5cdfe1b67d52a784950d
Author: Christian <[email protected]>
AuthorDate: Thu Oct 3 06:09:37 2024 +0200
fix: TableUpdate Snapshot deserialization for v1 (#656)
---
crates/iceberg/src/catalog/mod.rs | 96 +++++++++++++++++++++++++++++++++++++++
1 file changed, 96 insertions(+)
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 54abe808..854c1269 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -22,6 +22,7 @@ use std::fmt::Debug;
use std::mem::take;
use std::ops::Deref;
+use _serde::deserialize_snapshot;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
@@ -401,6 +402,7 @@ pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
AddSnapshot {
/// Snapshot to add.
+ #[serde(deserialize_with = "deserialize_snapshot")]
snapshot: Snapshot,
},
/// Set table's snapshot ref.
@@ -451,6 +453,65 @@ impl TableUpdate {
}
}
+pub(super) mod _serde {
+ use serde::{Deserialize as _, Deserializer};
+
+ use super::*;
+ use crate::spec::{SchemaId, Summary};
+
+ pub(super) fn deserialize_snapshot<'de, D>(
+ deserializer: D,
+ ) -> std::result::Result<Snapshot, D::Error>
+ where D: Deserializer<'de> {
+ let buf = CatalogSnapshot::deserialize(deserializer)?;
+ Ok(buf.into())
+ }
+
+ #[derive(Debug, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 snapshot for the catalog.
+ /// Main difference to SnapshotV2 is that sequence-number is optional
+ /// in the rest catalog spec to allow for backwards compatibility with v1.
+ struct CatalogSnapshot {
+ snapshot_id: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ parent_snapshot_id: Option<i64>,
+ #[serde(default)]
+ sequence_number: i64,
+ timestamp_ms: i64,
+ manifest_list: String,
+ summary: Summary,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ schema_id: Option<SchemaId>,
+ }
+
+ impl From<CatalogSnapshot> for Snapshot {
+ fn from(snapshot: CatalogSnapshot) -> Self {
+ let CatalogSnapshot {
+ snapshot_id,
+ parent_snapshot_id,
+ sequence_number,
+ timestamp_ms,
+ manifest_list,
+ schema_id,
+ summary,
+ } = snapshot;
+ let builder = Snapshot::builder()
+ .with_snapshot_id(snapshot_id)
+ .with_parent_snapshot_id(parent_snapshot_id)
+ .with_sequence_number(sequence_number)
+ .with_timestamp_ms(timestamp_ms)
+ .with_manifest_list(manifest_list)
+ .with_summary(summary);
+ if let Some(schema_id) = schema_id {
+ builder.with_schema_id(schema_id).build()
+ } else {
+ builder.build()
+ }
+ }
+ }
+}
+
/// ViewCreation represents the creation of a view in the catalog.
#[derive(Debug, TypedBuilder)]
pub struct ViewCreation {
@@ -968,6 +1029,41 @@ mod tests {
test_serde_json(json, update);
}
+ #[test]
+ fn test_add_snapshot_v1() {
+ let json = r#"
+{
+ "action": "add-snapshot",
+ "snapshot": {
+ "snapshot-id": 3055729675574597000,
+ "parent-snapshot-id": 3051729675574597000,
+ "timestamp-ms": 1555100955770,
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://a/b/2.avro"
+ }
+}
+ "#;
+
+ let update = TableUpdate::AddSnapshot {
+ snapshot: Snapshot::builder()
+ .with_snapshot_id(3055729675574597000)
+ .with_parent_snapshot_id(Some(3051729675574597000))
+ .with_timestamp_ms(1555100955770)
+ .with_sequence_number(0)
+ .with_manifest_list("s3://a/b/2.avro")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::default(),
+ })
+ .build(),
+ };
+
+ let actual: TableUpdate = serde_json::from_str(json).expect("Failed to
parse from json");
+ assert_eq!(actual, update, "Parsed value is not equal to expected");
+ }
+
#[test]
fn test_remove_snapshots() {
let json = r#"