This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e0b7f352 Make `schema` and `partition_spec` optional for
TableMetadataV1 (#1087)
e0b7f352 is described below
commit e0b7f352ed8857cb401db2c8e3aa97e61fe4bd71
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Mon Mar 31 22:09:58 2025 +0900
Make `schema` and `partition_spec` optional for TableMetadataV1 (#1087)
## Which issue does this PR close?
- Closes #1088
## What changes are included in this PR?
Fixes an issue reading V1 Iceberg tables created by AWS Glue. The table
metadata for V1 tables in Glue don't have the `schema` or
`partition-spec` fields that `TableMetadataV1` marks as required. Fix
this by making these fields optional and adding a test.
Edit: I'm guessing Glue is only incidental - it could be legacy tables
that were migrated into my Glue catalog.
## Are these changes tested?
Yes, a new test to verify that a Glue V1 Iceberg table can correctly be
parsed into a TableMetadata.
---
crates/iceberg/src/spec/table_metadata.rs | 237 ++++++++++++++++-----
.../table_metadata/TableMetadataV1Compat.json | 111 ++++++++++
.../TableMetadataV1NoValidSchema.json | 45 ++++
...leMetadataV1PartitionSpecsWithoutDefaultId.json | 58 +++++
.../TableMetadataV1SchemasWithoutCurrentId.json | 69 ++++++
5 files changed, 462 insertions(+), 58 deletions(-)
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index d5948af7..e9e99605 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -697,12 +697,14 @@ pub(super) mod _serde {
pub location: String,
pub last_updated_ms: i64,
pub last_column_id: i32,
- pub schema: SchemaV1,
+ /// `schema` is optional to prioritize `schemas` and
`current-schema-id`, allowing liberal reading of V1 metadata.
+ pub schema: Option<SchemaV1>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schemas: Option<Vec<SchemaV1>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub current_schema_id: Option<i32>,
- pub partition_spec: Vec<PartitionField>,
+ /// `partition_spec` is optional to prioritize `partition_specs`,
aligning with liberal reading of potentially invalid V1 metadata.
+ pub partition_spec: Option<Vec<PartitionField>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub partition_specs: Option<Vec<PartitionSpec>>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -896,60 +898,80 @@ pub(super) mod _serde {
value.current_snapshot_id
};
- let schemas = value
- .schemas
- .map(|schemas| {
- Ok::<_, Error>(HashMap::from_iter(
- schemas
+ let (schemas, current_schema_id, current_schema) =
+ if let (Some(schemas_vec), Some(schema_id)) =
+ (&value.schemas, value.current_schema_id)
+ {
+ // Option 1: Use 'schemas' + 'current_schema_id'
+ let schema_map = HashMap::from_iter(
+ schemas_vec
+ .clone()
.into_iter()
- .enumerate()
- .map(|(i, schema)| {
- Ok((
- schema.schema_id.unwrap_or(i as i32),
- Arc::new(schema.try_into()?),
- ))
+ .map(|schema| {
+ let schema: Schema = schema.try_into()?;
+ Ok((schema.schema_id(), Arc::new(schema)))
})
- .collect::<Result<Vec<_>, Error>>()?
- .into_iter(),
- ))
- })
- .or_else(|| {
- Some(value.schema.try_into().map(|schema: Schema| {
- HashMap::from_iter(vec![(schema.schema_id(),
Arc::new(schema))])
- }))
- })
- .transpose()?
- .unwrap_or_default();
- let current_schema_id = value
- .current_schema_id
- .unwrap_or_else(||
schemas.keys().copied().max().unwrap_or_default());
- let current_schema = schemas
- .get(¤t_schema_id)
- .ok_or_else(|| {
- Error::new(
+ .collect::<Result<Vec<_>, Error>>()?,
+ );
+
+ let schema = schema_map
+ .get(&schema_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No schema exists with the current schema
id {}.",
+ schema_id
+ ),
+ )
+ })?
+ .clone();
+ (schema_map, schema_id, schema)
+ } else if let Some(schema) = value.schema {
+ // Option 2: Fall back to `schema`
+ let schema: Schema = schema.try_into()?;
+ let schema_id = schema.schema_id();
+ let schema_arc = Arc::new(schema);
+ let schema_map = HashMap::from_iter(vec![(schema_id,
schema_arc.clone())]);
+ (schema_map, schema_id, schema_arc)
+ } else {
+ // Option 3: No valid schema configuration found
+ return Err(Error::new(
ErrorKind::DataInvalid,
- format!(
- "No schema exists with the current schema id {}.",
- current_schema_id
- ),
- )
- })?
- .clone();
+ "No valid schema configuration found in table
metadata",
+ ));
+ };
- let partition_specs = match value.partition_specs {
- Some(partition_specs) => partition_specs,
- None => vec![PartitionSpec::builder(current_schema.clone())
+ // Prioritize 'partition_specs' over 'partition_spec'
+ let partition_specs = if let Some(specs_vec) =
value.partition_specs {
+ // Option 1: Use 'partition_specs'
+ specs_vec
+ .into_iter()
+ .map(|x| (x.spec_id(), Arc::new(x)))
+ .collect::<HashMap<_, _>>()
+ } else if let Some(partition_spec) = value.partition_spec {
+ // Option 2: Fall back to 'partition_spec'
+ let spec = PartitionSpec::builder(current_schema.clone())
.with_spec_id(DEFAULT_PARTITION_SPEC_ID)
-
.add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))?
- .build()?],
- }
- .into_iter()
- .map(|x| (x.spec_id(), Arc::new(x)))
- .collect::<HashMap<_, _>>();
+ .add_unbound_fields(partition_spec.into_iter().map(|f|
f.into_unbound()))?
+ .build()?;
+ HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID,
Arc::new(spec))])
+ } else {
+ // Option 3: Create empty partition spec
+ let spec = PartitionSpec::builder(current_schema.clone())
+ .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
+ .build()?;
+
+ HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID,
Arc::new(spec))])
+ };
+
+ // Get the default_spec_id, prioritizing the explicit value if
provided
let default_spec_id = value
.default_spec_id
.unwrap_or_else(||
partition_specs.keys().copied().max().unwrap_or_default());
+
+ // Get the default spec
let default_spec: PartitionSpecRef = partition_specs
.get(&default_spec_id)
.map(|x| Arc::unwrap_or_clone(x.clone()))
@@ -1101,16 +1123,17 @@ pub(super) mod _serde {
location: v.location,
last_updated_ms: v.last_updated_ms,
last_column_id: v.last_column_id,
- schema: v
- .schemas
- .get(&v.current_schema_id)
- .ok_or(Error::new(
- ErrorKind::Unexpected,
- "current_schema_id not found in schemas",
- ))?
- .as_ref()
- .clone()
- .into(),
+ schema: Some(
+ v.schemas
+ .get(&v.current_schema_id)
+ .ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "current_schema_id not found in schemas",
+ ))?
+ .as_ref()
+ .clone()
+ .into(),
+ ),
schemas: Some(
v.schemas
.into_values()
@@ -1122,7 +1145,7 @@ pub(super) mod _serde {
.collect(),
),
current_schema_id: Some(v.current_schema_id),
- partition_spec: v.default_spec.fields().to_vec(),
+ partition_spec: Some(v.default_spec.fields().to_vec()),
partition_specs: Some(
v.partition_specs
.into_values()
@@ -2585,6 +2608,104 @@ mod tests {
check_table_metadata_serde(&metadata, expected);
}
+ #[test]
+ fn test_table_metadata_v1_compat() {
+ let metadata =
+
fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
+
+ // Deserialize the JSON to verify it works
+ let desered_type: TableMetadata = serde_json::from_str(&metadata)
+ .expect("Failed to deserialize TableMetadataV1Compat.json");
+
+ // Verify some key fields match
+ assert_eq!(desered_type.format_version(), FormatVersion::V1);
+ assert_eq!(
+ desered_type.uuid(),
+ Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
+ );
+ assert_eq!(
+ desered_type.location(),
+ "s3://bucket/warehouse/iceberg/glue.db/table_name"
+ );
+ assert_eq!(desered_type.last_updated_ms(), 1727773114005);
+ assert_eq!(desered_type.current_schema_id(), 0);
+ }
+
+ #[test]
+ fn test_table_metadata_v1_schemas_without_current_id() {
+ let metadata = fs::read_to_string(
+
"testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
+ )
+ .unwrap();
+
+ // Deserialize the JSON - this should succeed by using the 'schema'
field instead of 'schemas'
+ let desered_type: TableMetadata = serde_json::from_str(&metadata)
+ .expect("Failed to deserialize
TableMetadataV1SchemasWithoutCurrentId.json");
+
+ // Verify it used the 'schema' field
+ assert_eq!(desered_type.format_version(), FormatVersion::V1);
+ assert_eq!(
+ desered_type.uuid(),
+ Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
+ );
+
+ // Get the schema and verify it has the expected fields
+ let schema = desered_type.current_schema();
+ assert_eq!(schema.as_struct().fields().len(), 3);
+ assert_eq!(schema.as_struct().fields()[0].name, "x");
+ assert_eq!(schema.as_struct().fields()[1].name, "y");
+ assert_eq!(schema.as_struct().fields()[2].name, "z");
+ }
+
+ #[test]
+ fn test_table_metadata_v1_no_valid_schema() {
+ let metadata =
+
fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
+ .unwrap();
+
+ // Deserialize the JSON - this should fail because neither schemas +
current_schema_id nor schema is valid
+ let desered: Result<TableMetadata, serde_json::Error> =
serde_json::from_str(&metadata);
+
+ assert!(desered.is_err());
+ let error_message = desered.unwrap_err().to_string();
+ assert!(
+ error_message.contains("No valid schema configuration found"),
+ "Expected error about no valid schema configuration, got: {}",
+ error_message
+ );
+ }
+
+ #[test]
+ fn test_table_metadata_v1_partition_specs_without_default_id() {
+ let metadata = fs::read_to_string(
+
"testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
+ )
+ .unwrap();
+
+ // Deserialize the JSON - this should succeed by inferring
default_spec_id as the max spec ID
+ let desered_type: TableMetadata = serde_json::from_str(&metadata)
+ .expect("Failed to deserialize
TableMetadataV1PartitionSpecsWithoutDefaultId.json");
+
+ // Verify basic metadata
+ assert_eq!(desered_type.format_version(), FormatVersion::V1);
+ assert_eq!(
+ desered_type.uuid(),
+ Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
+ );
+
+ // Verify partition specs
+ assert_eq!(desered_type.default_partition_spec_id(), 2); // Should
pick the largest spec ID (2)
+ assert_eq!(desered_type.partition_specs.len(), 2);
+
+ // Verify the default spec has the expected fields
+ let default_spec = &desered_type.default_spec;
+ assert_eq!(default_spec.spec_id(), 2);
+ assert_eq!(default_spec.fields().len(), 1);
+ assert_eq!(default_spec.fields()[0].name, "y");
+ assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
+ assert_eq!(default_spec.fields()[0].source_id, 2);
+ }
+
#[test]
fn test_table_metadata_v2_schema_not_found() {
let metadata =
diff --git a/crates/iceberg/testdata/table_metadata/TableMetadataV1Compat.json
b/crates/iceberg/testdata/table_metadata/TableMetadataV1Compat.json
new file mode 100644
index 00000000..f22c8621
--- /dev/null
+++ b/crates/iceberg/testdata/table_metadata/TableMetadataV1Compat.json
@@ -0,0 +1,111 @@
+{
+ "current-schema-id": 0,
+ "current-snapshot-id": 917896971991367610,
+ "default-sort-order-id": 0,
+ "default-spec-id": 0,
+ "format-version": 1,
+ "last-column-id": 4,
+ "last-partition-id": 999,
+ "last-updated-ms": 1727773114005,
+ "location": "s3://bucket/warehouse/iceberg/glue.db/table_name",
+ "metadata-log": [
+ {
+ "metadata-file":
"s3://bucket/warehouse/iceberg/glue.db/table_name/metadata/00405-a3e8a93b-cc7f-430b-a731-6fa4357fb94e.metadata.json",
+ "timestamp-ms": 1727772184043
+ }
+ ],
+ "partition-specs": [
+ {
+ "fields": [],
+ "spec-id": 0
+ }
+ ],
+ "partition-statistics-files": [],
+ "properties": {
+ "owner": "spark",
+ "history.expire.max-snapshot-age-ms": "18000000",
+ "write.metadata.previous-versions-max": "20",
+ "stampisoend": "20241001085028951",
+ "write.metadata.delete-after-commit.enabled": "true",
+ "history.expire.max-ref-age-ms": "2592000000",
+ "write.distribution-mode": "range",
+ "write.merge.distribution-mode": "range",
+ "history.expire.min-snapshots-to-keep": "5"
+ },
+ "refs": {
+ "main": {
+ "snapshot-id": 917896971991367610,
+ "type": "branch"
+ }
+ },
+ "schemas": [
+ {
+ "fields": [
+ {
+ "id": 1,
+ "name": "record_id",
+ "required": false,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "record_time",
+ "required": false,
+ "type": "timestamptz"
+ },
+ {
+ "id": 3,
+ "name": "id",
+ "required": false,
+ "type": "long"
+ },
+ {
+ "id": 4,
+ "name": "name",
+ "required": false,
+ "type": "string"
+ }
+ ],
+ "schema-id": 0,
+ "type": "struct"
+ }
+ ],
+ "snapshot-log": [
+ {
+ "snapshot-id": 917896971991367610,
+ "timestamp-ms": 1727766315299
+ }
+ ],
+ "snapshots": [
+ {
+ "manifest-list":
"s3://bucket/warehouse/iceberg/glue.db/table_name/metadata/snap-917896971991367610-1-d466808b-46f1-4486-a655-e50d07014597.avro",
+ "schema-id": 0,
+ "snapshot-id": 917896971991367610,
+ "summary": {
+ "added-data-files": "1",
+ "total-equality-deletes": "0",
+ "added-records": "3",
+ "deleted-data-files": "1",
+ "deleted-records": "3",
+ "total-records": "3",
+ "removed-files-size": "1366",
+ "changed-partition-count": "1",
+ "total-position-deletes": "0",
+ "added-files-size": "1366",
+ "total-delete-files": "0",
+ "total-files-size": "1366",
+ "total-data-files": "1",
+ "operation": "overwrite"
+ },
+ "timestamp-ms": 1727766315299
+ }
+ ],
+ "sort-orders": [
+ {
+ "fields": [],
+ "order-id": 0
+ }
+ ],
+ "statistics-files": [],
+ "table-uuid": "3276010d-7b1d-488c-98d8-9025fc4fde6b"
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/table_metadata/TableMetadataV1NoValidSchema.json
b/crates/iceberg/testdata/table_metadata/TableMetadataV1NoValidSchema.json
new file mode 100644
index 00000000..b29c2fa3
--- /dev/null
+++ b/crates/iceberg/testdata/table_metadata/TableMetadataV1NoValidSchema.json
@@ -0,0 +1,45 @@
+{
+ "format-version": 1,
+ "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+ "location": "s3://bucket/test/location",
+ "last-updated-ms": 1602638573874,
+ "last-column-id": 3,
+ "schemas": [
+ {
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long",
+ "doc": "comment"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ],
+ "schema-id": 0,
+ "type": "struct"
+ }
+ ],
+ "partition-spec": [
+ {
+ "name": "x",
+ "transform": "identity",
+ "source-id": 1,
+ "field-id": 1000
+ }
+ ],
+ "properties": {},
+ "current-snapshot-id": -1,
+ "snapshots": []
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
b/crates/iceberg/testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
new file mode 100644
index 00000000..6f4ed6a9
--- /dev/null
+++
b/crates/iceberg/testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
@@ -0,0 +1,58 @@
+{
+ "format-version": 1,
+ "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+ "location": "s3://bucket/test/location",
+ "last-updated-ms": 1602638573874,
+ "last-column-id": 3,
+ "schema": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long",
+ "doc": "comment"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ]
+ },
+ "partition-specs": [
+ {
+ "spec-id": 1,
+ "fields": [
+ {
+ "name": "x",
+ "transform": "identity",
+ "source-id": 1,
+ "field-id": 1000
+ }
+ ]
+ },
+ {
+ "spec-id": 2,
+ "fields": [
+ {
+ "name": "y",
+ "transform": "identity",
+ "source-id": 2,
+ "field-id": 1001
+ }
+ ]
+ }
+ ],
+ "properties": {},
+ "current-snapshot-id": -1,
+ "snapshots": []
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json
b/crates/iceberg/testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json
new file mode 100644
index 00000000..7276c2a8
--- /dev/null
+++
b/crates/iceberg/testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json
@@ -0,0 +1,69 @@
+{
+ "format-version": 1,
+ "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+ "location": "s3://bucket/test/location",
+ "last-updated-ms": 1602638573874,
+ "last-column-id": 3,
+ "schemas": [
+ {
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long",
+ "doc": "comment"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ],
+ "schema-id": 0,
+ "type": "struct"
+ }
+ ],
+ "schema": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long",
+ "doc": "comment"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ]
+ },
+ "partition-spec": [
+ {
+ "name": "x",
+ "transform": "identity",
+ "source-id": 1,
+ "field-id": 1000
+ }
+ ],
+ "properties": {},
+ "current-snapshot-id": -1,
+ "snapshots": []
+}
\ No newline at end of file