This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c6cf0e98 fix: TableMetadata `last_updated_ms` not increased for all
operations (#978)
c6cf0e98 is described below
commit c6cf0e98c49c3c27241a11883f053084927225bf
Author: Christian <[email protected]>
AuthorDate: Thu Feb 20 03:59:19 2025 +0100
fix: TableMetadata `last_updated_ms` not increased for all operations (#978)
Currently we increase the `last_updated_ms` timestamp only if a snapshot
was added.
Java always updates this timestamp, also if i.e. only Properties where
added..
Trino has a catalog integration test that validates this - which we
failed due to this.
This PR ensures that `last_updated_ms` is always updated for builds.
---
crates/iceberg/src/spec/table_metadata_builder.rs | 61 +++++++++++++++++++----
1 file changed, 51 insertions(+), 10 deletions(-)
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs
b/crates/iceberg/src/spec/table_metadata_builder.rs
index cbf2e5e3..28e2f4e8 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -54,6 +54,7 @@ pub struct TableMetadataBuilder {
last_added_order_id: Option<i64>,
// None if this is a new table (from_metadata) method not used
previous_history_entry: Option<MetadataLog>,
+ last_updated_ms: Option<i64>,
}
#[derive(Debug, Clone, PartialEq)]
@@ -120,6 +121,7 @@ impl TableMetadataBuilder {
statistics: HashMap::new(),
partition_statistics: HashMap::new(),
},
+ last_updated_ms: None,
changes: vec![],
last_added_schema_id: Some(schema_id),
last_added_spec_id: None,
@@ -156,6 +158,7 @@ impl TableMetadataBuilder {
last_added_schema_id: None,
last_added_spec_id: None,
last_added_order_id: None,
+ last_updated_ms: None,
}
}
@@ -368,13 +371,17 @@ impl TableMetadataBuilder {
}
}
- if snapshot.timestamp_ms() - self.metadata.last_updated_ms <
-ONE_MINUTE_MS {
+ let max_last_updated = self
+ .last_updated_ms
+ .unwrap_or_default()
+ .max(self.metadata.last_updated_ms);
+ if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid snapshot timestamp {}: before last updated
timestamp {}",
snapshot.timestamp_ms(),
- self.metadata.last_updated_ms
+ max_last_updated
),
));
}
@@ -384,7 +391,7 @@ impl TableMetadataBuilder {
snapshot: snapshot.clone(),
});
- self.metadata.last_updated_ms = snapshot.timestamp_ms();
+ self.last_updated_ms = Some(snapshot.timestamp_ms());
self.metadata.last_sequence_number = snapshot.sequence_number();
self.metadata
.snapshots
@@ -483,19 +490,23 @@ impl TableMetadataBuilder {
matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if
snap.snapshot_id() == snapshot.snapshot_id())
});
if is_added_snapshot {
- self.metadata.last_updated_ms = snapshot.timestamp_ms();
+ self.last_updated_ms = Some(snapshot.timestamp_ms());
}
// Current snapshot id is set only for the main branch
if ref_name == MAIN_BRANCH {
self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
- if self.metadata.last_updated_ms == i64::default() {
- self.metadata.last_updated_ms =
chrono::Utc::now().timestamp_millis();
+ let timestamp_ms = if let Some(last_updated_ms) =
self.last_updated_ms {
+ last_updated_ms
+ } else {
+ let last_updated_ms = chrono::Utc::now().timestamp_millis();
+ self.last_updated_ms = Some(last_updated_ms);
+ last_updated_ms
};
self.metadata.snapshot_log.push(SnapshotLog {
snapshot_id: snapshot.snapshot_id(),
- timestamp_ms: self.metadata.last_updated_ms,
+ timestamp_ms,
});
}
@@ -911,9 +922,9 @@ impl TableMetadataBuilder {
/// Build the table metadata.
pub fn build(mut self) -> Result<TableMetadataBuildResult> {
- if self.metadata.last_updated_ms == i64::default() {
- self.metadata.last_updated_ms =
chrono::Utc::now().timestamp_millis();
- }
+ self.metadata.last_updated_ms = self
+ .last_updated_ms
+ .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
// Check compatibility of the current schema to the default partition
spec and sort order.
// We use the `get_xxx` methods from the builder to avoid using the
panicking
@@ -1210,6 +1221,8 @@ impl From<TableMetadataBuildResult> for TableMetadata {
#[cfg(test)]
mod tests {
+ use std::thread::sleep;
+
use super::*;
use crate::spec::{
BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec,
PrimitiveType, Schema,
@@ -2341,4 +2354,32 @@ mod tests {
assert_eq!(build_result.metadata.partition_statistics.len(), 0);
assert_eq!(build_result.changes.len(), 0);
}
+
+ #[test]
+ fn last_update_increased_for_property_only_update() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let metadata = builder.build().unwrap().metadata;
+ let last_updated_ms = metadata.last_updated_ms;
+ sleep(std::time::Duration::from_millis(2));
+
+ let build_result = metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata1.json".to_string(),
+ ))
+ .set_properties(HashMap::from_iter(vec![(
+ "foo".to_string(),
+ "bar".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert!(
+ build_result.metadata.last_updated_ms > last_updated_ms,
+ "{} > {}",
+ build_result.metadata.last_updated_ms,
+ last_updated_ms
+ );
+ }
}