This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e0b7f352 Make `schema` and `partition_spec` optional for 
TableMetadataV1 (#1087)
e0b7f352 is described below

commit e0b7f352ed8857cb401db2c8e3aa97e61fe4bd71
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Mon Mar 31 22:09:58 2025 +0900

    Make `schema` and `partition_spec` optional for TableMetadataV1 (#1087)
    
    ## Which issue does this PR close?
    
    - Closes #1088
    
    ## What changes are included in this PR?
    
    Fixes an issue reading V1 Iceberg tables created by AWS Glue. The table
    metadata for V1 tables in Glue don't have the `schema` or
    `partition-spec` fields that `TableMetadataV1` marks as required. Fix
    this by making these fields optional and adding a test.
    
    Edit: I'm guessing Glue is only incidental - it could be legacy tables
    that were migrated into my Glue catalog.
    
    ## Are these changes tested?
    
    Yes, a new test to verify that a Glue V1 Iceberg table can correctly be
    parsed into a TableMetadata.
---
 crates/iceberg/src/spec/table_metadata.rs          | 237 ++++++++++++++++-----
 .../table_metadata/TableMetadataV1Compat.json      | 111 ++++++++++
 .../TableMetadataV1NoValidSchema.json              |  45 ++++
 ...leMetadataV1PartitionSpecsWithoutDefaultId.json |  58 +++++
 .../TableMetadataV1SchemasWithoutCurrentId.json    |  69 ++++++
 5 files changed, 462 insertions(+), 58 deletions(-)

diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index d5948af7..e9e99605 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -697,12 +697,14 @@ pub(super) mod _serde {
         pub location: String,
         pub last_updated_ms: i64,
         pub last_column_id: i32,
-        pub schema: SchemaV1,
+        /// `schema` is optional to prioritize `schemas` and 
`current-schema-id`, allowing liberal reading of V1 metadata.
+        pub schema: Option<SchemaV1>,
         #[serde(skip_serializing_if = "Option::is_none")]
         pub schemas: Option<Vec<SchemaV1>>,
         #[serde(skip_serializing_if = "Option::is_none")]
         pub current_schema_id: Option<i32>,
-        pub partition_spec: Vec<PartitionField>,
+        /// `partition_spec` is optional to prioritize `partition_specs`, 
aligning with liberal reading of potentially invalid V1 metadata.
+        pub partition_spec: Option<Vec<PartitionField>>,
         #[serde(skip_serializing_if = "Option::is_none")]
         pub partition_specs: Option<Vec<PartitionSpec>>,
         #[serde(skip_serializing_if = "Option::is_none")]
@@ -896,60 +898,80 @@ pub(super) mod _serde {
                 value.current_snapshot_id
             };
 
-            let schemas = value
-                .schemas
-                .map(|schemas| {
-                    Ok::<_, Error>(HashMap::from_iter(
-                        schemas
+            let (schemas, current_schema_id, current_schema) =
+                if let (Some(schemas_vec), Some(schema_id)) =
+                    (&value.schemas, value.current_schema_id)
+                {
+                    // Option 1: Use 'schemas' + 'current_schema_id'
+                    let schema_map = HashMap::from_iter(
+                        schemas_vec
+                            .clone()
                             .into_iter()
-                            .enumerate()
-                            .map(|(i, schema)| {
-                                Ok((
-                                    schema.schema_id.unwrap_or(i as i32),
-                                    Arc::new(schema.try_into()?),
-                                ))
+                            .map(|schema| {
+                                let schema: Schema = schema.try_into()?;
+                                Ok((schema.schema_id(), Arc::new(schema)))
                             })
-                            .collect::<Result<Vec<_>, Error>>()?
-                            .into_iter(),
-                    ))
-                })
-                .or_else(|| {
-                    Some(value.schema.try_into().map(|schema: Schema| {
-                        HashMap::from_iter(vec![(schema.schema_id(), 
Arc::new(schema))])
-                    }))
-                })
-                .transpose()?
-                .unwrap_or_default();
-            let current_schema_id = value
-                .current_schema_id
-                .unwrap_or_else(|| 
schemas.keys().copied().max().unwrap_or_default());
-            let current_schema = schemas
-                .get(&current_schema_id)
-                .ok_or_else(|| {
-                    Error::new(
+                            .collect::<Result<Vec<_>, Error>>()?,
+                    );
+
+                    let schema = schema_map
+                        .get(&schema_id)
+                        .ok_or_else(|| {
+                            Error::new(
+                                ErrorKind::DataInvalid,
+                                format!(
+                                    "No schema exists with the current schema 
id {}.",
+                                    schema_id
+                                ),
+                            )
+                        })?
+                        .clone();
+                    (schema_map, schema_id, schema)
+                } else if let Some(schema) = value.schema {
+                    // Option 2: Fall back to `schema`
+                    let schema: Schema = schema.try_into()?;
+                    let schema_id = schema.schema_id();
+                    let schema_arc = Arc::new(schema);
+                    let schema_map = HashMap::from_iter(vec![(schema_id, 
schema_arc.clone())]);
+                    (schema_map, schema_id, schema_arc)
+                } else {
+                    // Option 3: No valid schema configuration found
+                    return Err(Error::new(
                         ErrorKind::DataInvalid,
-                        format!(
-                            "No schema exists with the current schema id {}.",
-                            current_schema_id
-                        ),
-                    )
-                })?
-                .clone();
+                        "No valid schema configuration found in table 
metadata",
+                    ));
+                };
 
-            let partition_specs = match value.partition_specs {
-                Some(partition_specs) => partition_specs,
-                None => vec![PartitionSpec::builder(current_schema.clone())
+            // Prioritize 'partition_specs' over 'partition_spec'
+            let partition_specs = if let Some(specs_vec) = 
value.partition_specs {
+                // Option 1: Use 'partition_specs'
+                specs_vec
+                    .into_iter()
+                    .map(|x| (x.spec_id(), Arc::new(x)))
+                    .collect::<HashMap<_, _>>()
+            } else if let Some(partition_spec) = value.partition_spec {
+                // Option 2: Fall back to 'partition_spec'
+                let spec = PartitionSpec::builder(current_schema.clone())
                     .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
-                    
.add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))?
-                    .build()?],
-            }
-            .into_iter()
-            .map(|x| (x.spec_id(), Arc::new(x)))
-            .collect::<HashMap<_, _>>();
+                    .add_unbound_fields(partition_spec.into_iter().map(|f| 
f.into_unbound()))?
+                    .build()?;
 
+                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, 
Arc::new(spec))])
+            } else {
+                // Option 3: Create empty partition spec
+                let spec = PartitionSpec::builder(current_schema.clone())
+                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
+                    .build()?;
+
+                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, 
Arc::new(spec))])
+            };
+
+            // Get the default_spec_id, prioritizing the explicit value if 
provided
             let default_spec_id = value
                 .default_spec_id
                 .unwrap_or_else(|| 
partition_specs.keys().copied().max().unwrap_or_default());
+
+            // Get the default spec
             let default_spec: PartitionSpecRef = partition_specs
                 .get(&default_spec_id)
                 .map(|x| Arc::unwrap_or_clone(x.clone()))
@@ -1101,16 +1123,17 @@ pub(super) mod _serde {
                 location: v.location,
                 last_updated_ms: v.last_updated_ms,
                 last_column_id: v.last_column_id,
-                schema: v
-                    .schemas
-                    .get(&v.current_schema_id)
-                    .ok_or(Error::new(
-                        ErrorKind::Unexpected,
-                        "current_schema_id not found in schemas",
-                    ))?
-                    .as_ref()
-                    .clone()
-                    .into(),
+                schema: Some(
+                    v.schemas
+                        .get(&v.current_schema_id)
+                        .ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            "current_schema_id not found in schemas",
+                        ))?
+                        .as_ref()
+                        .clone()
+                        .into(),
+                ),
                 schemas: Some(
                     v.schemas
                         .into_values()
@@ -1122,7 +1145,7 @@ pub(super) mod _serde {
                         .collect(),
                 ),
                 current_schema_id: Some(v.current_schema_id),
-                partition_spec: v.default_spec.fields().to_vec(),
+                partition_spec: Some(v.default_spec.fields().to_vec()),
                 partition_specs: Some(
                     v.partition_specs
                         .into_values()
@@ -2585,6 +2608,104 @@ mod tests {
         check_table_metadata_serde(&metadata, expected);
     }
 
+    #[test]
+    fn test_table_metadata_v1_compat() {
+        let metadata =
+            
fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
+
+        // Deserialize the JSON to verify it works
+        let desered_type: TableMetadata = serde_json::from_str(&metadata)
+            .expect("Failed to deserialize TableMetadataV1Compat.json");
+
+        // Verify some key fields match
+        assert_eq!(desered_type.format_version(), FormatVersion::V1);
+        assert_eq!(
+            desered_type.uuid(),
+            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
+        );
+        assert_eq!(
+            desered_type.location(),
+            "s3://bucket/warehouse/iceberg/glue.db/table_name"
+        );
+        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
+        assert_eq!(desered_type.current_schema_id(), 0);
+    }
+
+    #[test]
+    fn test_table_metadata_v1_schemas_without_current_id() {
+        let metadata = fs::read_to_string(
+            
"testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
+        )
+        .unwrap();
+
+        // Deserialize the JSON - this should succeed by using the 'schema' 
field instead of 'schemas'
+        let desered_type: TableMetadata = serde_json::from_str(&metadata)
+            .expect("Failed to deserialize 
TableMetadataV1SchemasWithoutCurrentId.json");
+
+        // Verify it used the 'schema' field
+        assert_eq!(desered_type.format_version(), FormatVersion::V1);
+        assert_eq!(
+            desered_type.uuid(),
+            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
+        );
+
+        // Get the schema and verify it has the expected fields
+        let schema = desered_type.current_schema();
+        assert_eq!(schema.as_struct().fields().len(), 3);
+        assert_eq!(schema.as_struct().fields()[0].name, "x");
+        assert_eq!(schema.as_struct().fields()[1].name, "y");
+        assert_eq!(schema.as_struct().fields()[2].name, "z");
+    }
+
+    #[test]
+    fn test_table_metadata_v1_no_valid_schema() {
+        let metadata =
+            
fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
+                .unwrap();
+
+        // Deserialize the JSON - this should fail because neither schemas + 
current_schema_id nor schema is valid
+        let desered: Result<TableMetadata, serde_json::Error> = 
serde_json::from_str(&metadata);
+
+        assert!(desered.is_err());
+        let error_message = desered.unwrap_err().to_string();
+        assert!(
+            error_message.contains("No valid schema configuration found"),
+            "Expected error about no valid schema configuration, got: {}",
+            error_message
+        );
+    }
+
+    #[test]
+    fn test_table_metadata_v1_partition_specs_without_default_id() {
+        let metadata = fs::read_to_string(
+            
"testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
+        )
+        .unwrap();
+
+        // Deserialize the JSON - this should succeed by inferring 
default_spec_id as the max spec ID
+        let desered_type: TableMetadata = serde_json::from_str(&metadata)
+            .expect("Failed to deserialize 
TableMetadataV1PartitionSpecsWithoutDefaultId.json");
+
+        // Verify basic metadata
+        assert_eq!(desered_type.format_version(), FormatVersion::V1);
+        assert_eq!(
+            desered_type.uuid(),
+            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
+        );
+
+        // Verify partition specs
+        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should 
pick the largest spec ID (2)
+        assert_eq!(desered_type.partition_specs.len(), 2);
+
+        // Verify the default spec has the expected fields
+        let default_spec = &desered_type.default_spec;
+        assert_eq!(default_spec.spec_id(), 2);
+        assert_eq!(default_spec.fields().len(), 1);
+        assert_eq!(default_spec.fields()[0].name, "y");
+        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
+        assert_eq!(default_spec.fields()[0].source_id, 2);
+    }
+
     #[test]
     fn test_table_metadata_v2_schema_not_found() {
         let metadata =
diff --git a/crates/iceberg/testdata/table_metadata/TableMetadataV1Compat.json 
b/crates/iceberg/testdata/table_metadata/TableMetadataV1Compat.json
new file mode 100644
index 00000000..f22c8621
--- /dev/null
+++ b/crates/iceberg/testdata/table_metadata/TableMetadataV1Compat.json
@@ -0,0 +1,111 @@
+{
+  "current-schema-id": 0,
+  "current-snapshot-id": 917896971991367610,
+  "default-sort-order-id": 0,
+  "default-spec-id": 0,
+  "format-version": 1,
+  "last-column-id": 4,
+  "last-partition-id": 999,
+  "last-updated-ms": 1727773114005,
+  "location": "s3://bucket/warehouse/iceberg/glue.db/table_name",
+  "metadata-log": [
+    {
+      "metadata-file": 
"s3://bucket/warehouse/iceberg/glue.db/table_name/metadata/00405-a3e8a93b-cc7f-430b-a731-6fa4357fb94e.metadata.json",
+      "timestamp-ms": 1727772184043
+    }
+  ],
+  "partition-specs": [
+    {
+      "fields": [],
+      "spec-id": 0
+    }
+  ],
+  "partition-statistics-files": [],
+  "properties": {
+    "owner": "spark",
+    "history.expire.max-snapshot-age-ms": "18000000",
+    "write.metadata.previous-versions-max": "20",
+    "stampisoend": "20241001085028951",
+    "write.metadata.delete-after-commit.enabled": "true",
+    "history.expire.max-ref-age-ms": "2592000000",
+    "write.distribution-mode": "range",
+    "write.merge.distribution-mode": "range",
+    "history.expire.min-snapshots-to-keep": "5"
+  },
+  "refs": {
+    "main": {
+      "snapshot-id": 917896971991367610,
+      "type": "branch"
+    }
+  },
+  "schemas": [
+    {
+      "fields": [
+        {
+          "id": 1,
+          "name": "record_id",
+          "required": false,
+          "type": "long"
+        },
+        {
+          "id": 2,
+          "name": "record_time",
+          "required": false,
+          "type": "timestamptz"
+        },
+        {
+          "id": 3,
+          "name": "id",
+          "required": false,
+          "type": "long"
+        },
+        {
+          "id": 4,
+          "name": "name",
+          "required": false,
+          "type": "string"
+        }
+      ],
+      "schema-id": 0,
+      "type": "struct"
+    }
+  ],
+  "snapshot-log": [
+    {
+      "snapshot-id": 917896971991367610,
+      "timestamp-ms": 1727766315299
+    }
+  ],
+  "snapshots": [
+    {
+      "manifest-list": 
"s3://bucket/warehouse/iceberg/glue.db/table_name/metadata/snap-917896971991367610-1-d466808b-46f1-4486-a655-e50d07014597.avro",
+      "schema-id": 0,
+      "snapshot-id": 917896971991367610,
+      "summary": {
+        "added-data-files": "1",
+        "total-equality-deletes": "0",
+        "added-records": "3",
+        "deleted-data-files": "1",
+        "deleted-records": "3",
+        "total-records": "3",
+        "removed-files-size": "1366",
+        "changed-partition-count": "1",
+        "total-position-deletes": "0",
+        "added-files-size": "1366",
+        "total-delete-files": "0",
+        "total-files-size": "1366",
+        "total-data-files": "1",
+        "operation": "overwrite"
+      },
+      "timestamp-ms": 1727766315299
+    }
+  ],
+  "sort-orders": [
+    {
+      "fields": [],
+      "order-id": 0
+    }
+  ],
+  "statistics-files": [],
+  "table-uuid": "3276010d-7b1d-488c-98d8-9025fc4fde6b"
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/table_metadata/TableMetadataV1NoValidSchema.json 
b/crates/iceberg/testdata/table_metadata/TableMetadataV1NoValidSchema.json
new file mode 100644
index 00000000..b29c2fa3
--- /dev/null
+++ b/crates/iceberg/testdata/table_metadata/TableMetadataV1NoValidSchema.json
@@ -0,0 +1,45 @@
+{
+  "format-version": 1,
+  "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+  "location": "s3://bucket/test/location",
+  "last-updated-ms": 1602638573874,
+  "last-column-id": 3,
+  "schemas": [
+    {
+      "fields": [
+        {
+          "id": 1,
+          "name": "x",
+          "required": true,
+          "type": "long"
+        },
+        {
+          "id": 2,
+          "name": "y",
+          "required": true,
+          "type": "long",
+          "doc": "comment"
+        },
+        {
+          "id": 3,
+          "name": "z",
+          "required": true,
+          "type": "long"
+        }
+      ],
+      "schema-id": 0,
+      "type": "struct"
+    }
+  ],
+  "partition-spec": [
+    {
+      "name": "x",
+      "transform": "identity",
+      "source-id": 1,
+      "field-id": 1000
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": -1,
+  "snapshots": []
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
 
b/crates/iceberg/testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
new file mode 100644
index 00000000..6f4ed6a9
--- /dev/null
+++ 
b/crates/iceberg/testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
@@ -0,0 +1,58 @@
+{
+  "format-version": 1,
+  "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+  "location": "s3://bucket/test/location",
+  "last-updated-ms": 1602638573874,
+  "last-column-id": 3,
+  "schema": {
+    "type": "struct",
+    "fields": [
+      {
+        "id": 1,
+        "name": "x",
+        "required": true,
+        "type": "long"
+      },
+      {
+        "id": 2,
+        "name": "y",
+        "required": true,
+        "type": "long",
+        "doc": "comment"
+      },
+      {
+        "id": 3,
+        "name": "z",
+        "required": true,
+        "type": "long"
+      }
+    ]
+  },
+  "partition-specs": [
+    {
+      "spec-id": 1,
+      "fields": [
+        {
+          "name": "x",
+          "transform": "identity",
+          "source-id": 1,
+          "field-id": 1000
+        }
+      ]
+    },
+    {
+      "spec-id": 2,
+      "fields": [
+        {
+          "name": "y",
+          "transform": "identity",
+          "source-id": 2,
+          "field-id": 1001
+        }
+      ]
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": -1,
+  "snapshots": []
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json
 
b/crates/iceberg/testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json
new file mode 100644
index 00000000..7276c2a8
--- /dev/null
+++ 
b/crates/iceberg/testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json
@@ -0,0 +1,69 @@
+{
+  "format-version": 1,
+  "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+  "location": "s3://bucket/test/location",
+  "last-updated-ms": 1602638573874,
+  "last-column-id": 3,
+  "schemas": [
+    {
+      "fields": [
+        {
+          "id": 1,
+          "name": "x",
+          "required": true,
+          "type": "long"
+        },
+        {
+          "id": 2,
+          "name": "y",
+          "required": true,
+          "type": "long",
+          "doc": "comment"
+        },
+        {
+          "id": 3,
+          "name": "z",
+          "required": true,
+          "type": "long"
+        }
+      ],
+      "schema-id": 0,
+      "type": "struct"
+    }
+  ],
+  "schema": {
+    "type": "struct",
+    "fields": [
+      {
+        "id": 1,
+        "name": "x",
+        "required": true,
+        "type": "long"
+      },
+      {
+        "id": 2,
+        "name": "y",
+        "required": true,
+        "type": "long",
+        "doc": "comment"
+      },
+      {
+        "id": 3,
+        "name": "z",
+        "required": true,
+        "type": "long"
+      }
+    ]
+  },
+  "partition-spec": [
+    {
+      "name": "x",
+      "transform": "identity",
+      "source-id": 1,
+      "field-id": 1000
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": -1,
+  "snapshots": []
+}
\ No newline at end of file

Reply via email to