This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 56fda82f3 feat!(catalog): adding support for purge_table (#2232)
56fda82f3 is described below

commit 56fda82f3ad68f3231ece7150f52b78df71bb06e
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Mar 24 18:00:35 2026 -0700

    feat!(catalog): adding support for purge_table (#2232)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #2133
    
    ## What changes are included in this PR?
    - Add catalog/utils.rs to provide helpers to delete table data using
    file_io and table_metadata
    - Add new API `purge_table` to `Catalog` trait and add default
    implementation
    - Implement purge_table for S3TableCatalog and RestCatalog
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    ## Are these changes tested?
    Added new tests in table_suite
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
---
 crates/catalog/glue/src/catalog.rs           |  11 +++
 crates/catalog/hms/src/catalog.rs            |  11 +++
 crates/catalog/loader/tests/common/mod.rs    |   2 +-
 crates/catalog/loader/tests/table_suite.rs   |  77 +++++++++++++++++
 crates/catalog/rest/src/catalog.rs           |  55 +++++++-----
 crates/catalog/s3tables/src/catalog.rs       |  19 ++--
 crates/catalog/sql/src/catalog.rs            |  11 +++
 crates/iceberg/src/catalog/memory/catalog.rs |  11 +++
 crates/iceberg/src/catalog/mod.rs            |   9 ++
 crates/iceberg/src/catalog/utils.rs          | 124 +++++++++++++++++++++++++++
 crates/iceberg/src/lib.rs                    |   1 +
 crates/iceberg/src/spec/table_properties.rs  |  35 ++++++++
 12 files changed, 337 insertions(+), 29 deletions(-)

diff --git a/crates/catalog/glue/src/catalog.rs 
b/crates/catalog/glue/src/catalog.rs
index 9e9d4580c..a7e017133 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -659,6 +659,17 @@ impl Catalog for GlueCatalog {
         Ok(())
     }
 
+    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
+        let table_info = self.load_table(table).await?;
+        self.drop_table(table).await?;
+        iceberg::drop_table_data(
+            table_info.file_io(),
+            table_info.metadata(),
+            table_info.metadata_location(),
+        )
+        .await
+    }
+
     /// Asynchronously checks the existence of a specified table
     /// in the database.
     ///
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
index bd7819373..4a030c110 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -604,6 +604,17 @@ impl Catalog for HmsCatalog {
         Ok(())
     }
 
+    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
+        let table_info = self.load_table(table).await?;
+        self.drop_table(table).await?;
+        iceberg::drop_table_data(
+            table_info.file_io(),
+            table_info.metadata(),
+            table_info.metadata_location(),
+        )
+        .await
+    }
+
     /// Asynchronously checks the existence of a specified table
     /// in the database.
     ///
diff --git a/crates/catalog/loader/tests/common/mod.rs 
b/crates/catalog/loader/tests/common/mod.rs
index 90b72df8a..600cd9b6f 100644
--- a/crates/catalog/loader/tests/common/mod.rs
+++ b/crates/catalog/loader/tests/common/mod.rs
@@ -335,7 +335,7 @@ pub fn assert_map_contains(expected: &HashMap<String, 
String>, actual: &HashMap<
 pub async fn cleanup_namespace_dyn(catalog: &dyn Catalog, namespace: 
&NamespaceIdent) {
     if let Ok(tables) = catalog.list_tables(namespace).await {
         for table in tables {
-            let _ = catalog.drop_table(&table).await;
+            let _ = catalog.purge_table(&table).await;
         }
     }
     let _ = catalog.drop_namespace(namespace).await;
diff --git a/crates/catalog/loader/tests/table_suite.rs 
b/crates/catalog/loader/tests/table_suite.rs
index 6b7a3a822..cdc9b1104 100644
--- a/crates/catalog/loader/tests/table_suite.rs
+++ b/crates/catalog/loader/tests/table_suite.rs
@@ -274,3 +274,80 @@ async fn test_catalog_drop_table_missing_errors(#[case] 
kind: CatalogKind) -> Re
     assert!(catalog.drop_table(&table_ident).await.is_err());
     Ok(())
 }
+
+// Common behavior: purge_table removes the table from the catalog.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_purge_table(#[case] kind: CatalogKind) -> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_purge_table",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+    catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+    let table_name = normalize_test_name_with_parts!("catalog_purge_table", 
harness.label, "table");
+    let table = catalog
+        .create_table(&namespace, table_creation(table_name))
+        .await?;
+    let ident = table.identifier().clone();
+
+    assert!(catalog.table_exists(&ident).await?);
+
+    // Capture metadata location and file_io before purge so we can verify
+    // that the underlying files are actually deleted.
+    let metadata_location = table.metadata_location().map(|s| s.to_string());
+    let file_io = table.file_io().clone();
+
+    catalog.purge_table(&ident).await?;
+    assert!(!catalog.table_exists(&ident).await?);
+
+    if let Some(location) = &metadata_location {
+        assert!(
+            !file_io.exists(location).await?,
+            "Metadata file should have been deleted after purge"
+        );
+    }
+
+    catalog.drop_namespace(&namespace).await?;
+
+    Ok(())
+}
+
+// Common behavior: purging a missing table should error.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_purge_table_missing_errors(#[case] kind: CatalogKind) -> 
Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_purge_table_missing_errors",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+    catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+    let table_ident = TableIdent::new(namespace.clone(), 
"missing".to_string());
+    assert!(catalog.purge_table(&table_ident).await.is_err());
+    Ok(())
+}
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index 3551b0516..7d5df24d5 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -363,6 +363,35 @@ impl RestCatalog {
         }
     }
 
+    /// Sends a DELETE request for the given table, optionally requesting 
purge.
+    async fn delete_table(&self, table: &TableIdent, purge: bool) -> 
Result<()> {
+        let context = self.context().await?;
+
+        let mut request_builder = context
+            .client
+            .request(Method::DELETE, context.config.table_endpoint(table));
+
+        if purge {
+            request_builder = request_builder.query(&[("purgeRequested", 
"true")]);
+        }
+
+        let request = request_builder.build()?;
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::TableNotFound,
+                "Tried to drop a table that does not exist",
+            )),
+            _ => Err(deserialize_unexpected_catalog_error(
+                http_response,
+                context.client.disable_header_redaction(),
+            )
+            .await),
+        }
+    }
+
     /// Gets the [`RestContext`] from the catalog.
     async fn context(&self) -> Result<&RestContext> {
         self.ctx
@@ -828,27 +857,13 @@ impl Catalog for RestCatalog {
 
     /// Drop a table from the catalog.
     async fn drop_table(&self, table: &TableIdent) -> Result<()> {
-        let context = self.context().await?;
-
-        let request = context
-            .client
-            .request(Method::DELETE, context.config.table_endpoint(table))
-            .build()?;
-
-        let http_response = context.client.query_catalog(request).await?;
+        self.delete_table(table, false).await
+    }
 
-        match http_response.status() {
-            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
-            StatusCode::NOT_FOUND => Err(Error::new(
-                ErrorKind::TableNotFound,
-                "Tried to drop a table that does not exist",
-            )),
-            _ => Err(deserialize_unexpected_catalog_error(
-                http_response,
-                context.client.disable_header_redaction(),
-            )
-            .await),
-        }
+    /// Drop a table from the catalog and purge its data by sending
+    /// `purgeRequested=true` to the REST server.
+    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
+        self.delete_table(table, true).await
     }
 
     /// Check if a table exists in the catalog.
diff --git a/crates/catalog/s3tables/src/catalog.rs 
b/crates/catalog/s3tables/src/catalog.rs
index a416c38f2..b88bd77d2 100644
--- a/crates/catalog/s3tables/src/catalog.rs
+++ b/crates/catalog/s3tables/src/catalog.rs
@@ -562,15 +562,18 @@ impl Catalog for S3TablesCatalog {
         Ok(self.load_table_with_version_token(table_ident).await?.0)
     }
 
-    /// Drops an existing table from the s3tables catalog.
+    /// Not supported for S3Tables. Use `purge_table` instead.
     ///
-    /// Validates the table identifier and then deletes the corresponding
-    /// table from the s3tables catalog.
-    ///
-    /// This function can return an error in the following situations:
-    /// - Errors from the underlying database deletion process, converted using
-    /// `from_aws_sdk_error`.
-    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+    /// S3 Tables doesn't support soft delete, so dropping a table will 
permanently remove it from the catalog.
+    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "drop_table is not supported for S3Tables; use purge_table 
instead",
+        ))
+    }
+
+    /// Purge a table from the S3 Tables catalog.
+    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
         let req = self
             .s3tables_client
             .delete_table()
diff --git a/crates/catalog/sql/src/catalog.rs 
b/crates/catalog/sql/src/catalog.rs
index 195f6c9de..7e468e7e3 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -757,6 +757,17 @@ impl Catalog for SqlCatalog {
         Ok(())
     }
 
+    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
+        let table_info = self.load_table(table).await?;
+        self.drop_table(table).await?;
+        iceberg::drop_table_data(
+            table_info.file_io(),
+            table_info.metadata(),
+            table_info.metadata_location(),
+        )
+        .await
+    }
+
     async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
         if !self.table_exists(identifier).await? {
             return no_such_table_err(identifier);
diff --git a/crates/iceberg/src/catalog/memory/catalog.rs 
b/crates/iceberg/src/catalog/memory/catalog.rs
index 25ae00441..8fa5c479c 100644
--- a/crates/iceberg/src/catalog/memory/catalog.rs
+++ b/crates/iceberg/src/catalog/memory/catalog.rs
@@ -326,6 +326,17 @@ impl Catalog for MemoryCatalog {
         Ok(())
     }
 
+    async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
+        let table_info = self.load_table(table_ident).await?;
+        self.drop_table(table_ident).await?;
+        crate::catalog::utils::drop_table_data(
+            table_info.file_io(),
+            table_info.metadata(),
+            table_info.metadata_location(),
+        )
+        .await
+    }
+
     /// Check if a table exists in the catalog.
     async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
         let root_namespace_state = self.root_namespace_state.lock().await;
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index 06326917e..f296cf226 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -19,6 +19,7 @@
 
 pub mod memory;
 mod metadata_location;
+pub(crate) mod utils;
 
 use std::collections::HashMap;
 use std::fmt::{Debug, Display};
@@ -98,6 +99,14 @@ pub trait Catalog: Debug + Sync + Send {
     /// Drop a table from the catalog, or returns error if it doesn't exist.
     async fn drop_table(&self, table: &TableIdent) -> Result<()>;
 
+    /// Drop a table from the catalog and delete the underlying table data.
+    ///
+    /// Implementations should load the table metadata, drop the table
+    /// from the catalog, then delete all associated data and metadata files.
+    /// The [`drop_table_data`](utils::drop_table_data) utility function can
+    /// be used for the file cleanup step.
+    async fn purge_table(&self, table: &TableIdent) -> Result<()>;
+
     /// Check if a table exists in the catalog.
     async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
 
diff --git a/crates/iceberg/src/catalog/utils.rs 
b/crates/iceberg/src/catalog/utils.rs
new file mode 100644
index 000000000..d450f9df8
--- /dev/null
+++ b/crates/iceberg/src/catalog/utils.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility functions for catalog operations.
+
+use std::collections::HashSet;
+
+use futures::{TryStreamExt, stream};
+
+use crate::Result;
+use crate::io::FileIO;
+use crate::spec::TableMetadata;
+
+const DELETE_CONCURRENCY: usize = 10;
+
+/// Deletes all data and metadata files referenced by the given table metadata.
+///
+/// This mirrors the Java implementation's `CatalogUtil.dropTableData`.
+/// It collects all manifest files, manifest lists, previous metadata files,
+/// statistics files, and partition statistics files, then deletes them.
+///
+/// Data files within manifests are only deleted if the `gc.enabled` table
+/// property is `true` (the default), to avoid corrupting other tables that
+/// may share the same data files.
+pub async fn drop_table_data(
+    io: &FileIO,
+    metadata: &TableMetadata,
+    metadata_location: Option<&str>,
+) -> Result<()> {
+    let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
+    let mut manifests_to_delete: HashSet<String> = HashSet::new();
+
+    // Load all manifest lists concurrently
+    let results: Vec<_> =
+        futures::future::try_join_all(metadata.snapshots().map(|snapshot| 
async {
+            let manifest_list = snapshot.load_manifest_list(io, 
metadata).await?;
+            Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), 
manifest_list))
+        }))
+        .await?;
+
+    for (manifest_list_location, manifest_list) in results {
+        if !manifest_list_location.is_empty() {
+            manifest_lists_to_delete.insert(manifest_list_location);
+        }
+        for manifest_file in manifest_list.entries() {
+            manifests_to_delete.insert(manifest_file.manifest_path.clone());
+        }
+    }
+
+    // Delete data files only if gc.enabled is true, to avoid corrupting 
shared tables
+    if metadata.table_properties()?.gc_enabled {
+        delete_data_files(io, &manifests_to_delete).await?;
+    }
+
+    // Delete manifest files
+    io.delete_stream(stream::iter(manifests_to_delete)).await?;
+
+    // Delete manifest lists
+    io.delete_stream(stream::iter(manifest_lists_to_delete))
+        .await?;
+
+    // Delete previous metadata files
+    let prev_metadata_paths: Vec<String> = metadata
+        .metadata_log()
+        .iter()
+        .map(|m| m.metadata_file.clone())
+        .collect();
+    io.delete_stream(stream::iter(prev_metadata_paths)).await?;
+
+    // Delete statistics files
+    let stats_paths: Vec<String> = metadata
+        .statistics_iter()
+        .map(|s| s.statistics_path.clone())
+        .collect();
+    io.delete_stream(stream::iter(stats_paths)).await?;
+
+    // Delete partition statistics files
+    let partition_stats_paths: Vec<String> = metadata
+        .partition_statistics_iter()
+        .map(|s| s.statistics_path.clone())
+        .collect();
+    io.delete_stream(stream::iter(partition_stats_paths))
+        .await?;
+
+    // Delete the current metadata file
+    if let Some(location) = metadata_location {
+        io.delete(location).await?;
+    }
+
+    Ok(())
+}
+
+/// Reads manifests concurrently and deletes the data files referenced within.
+async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet<String>) -> 
Result<()> {
+    stream::iter(manifest_paths.iter().map(Ok))
+        .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async 
move {
+            let input = io.new_input(manifest_path)?;
+            let manifest_content = input.read().await?;
+            let manifest = 
crate::spec::Manifest::parse_avro(&manifest_content)?;
+
+            let data_file_paths = manifest
+                .entries()
+                .iter()
+                .map(|entry| entry.data_file.file_path().to_string())
+                .collect::<Vec<_>>();
+
+            io.delete_stream(stream::iter(data_file_paths)).await
+        })
+        .await
+}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 0b138d281..44a360142 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -71,6 +71,7 @@ pub use error::{Error, ErrorKind, Result};
 
 mod catalog;
 
+pub use catalog::utils::drop_table_data;
 pub use catalog::*;
 
 pub mod table;
diff --git a/crates/iceberg/src/spec/table_properties.rs 
b/crates/iceberg/src/spec/table_properties.rs
index 6e0831847..07c157304 100644
--- a/crates/iceberg/src/spec/table_properties.rs
+++ b/crates/iceberg/src/spec/table_properties.rs
@@ -114,6 +114,9 @@ pub struct TableProperties {
     pub metadata_compression_codec: CompressionCodec,
     /// Whether to use `FanoutWriter` for partitioned tables.
     pub write_datafusion_fanout_enabled: bool,
+    /// Whether garbage collection is enabled on drop.
+    /// When `false`, data files will not be deleted when a table is dropped.
+    pub gc_enabled: bool,
 }
 
 impl TableProperties {
@@ -212,6 +215,13 @@ impl TableProperties {
     pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = 
"write.datafusion.fanout.enabled";
     /// Default value for fanout writer enabled
     pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
+
+    /// Property key for enabling garbage collection on drop.
+    /// When set to `false`, data files will not be deleted when a table is 
dropped.
+    /// Defaults to `true`.
+    pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
+    /// Default value for gc.enabled
+    pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
 }
 
 impl TryFrom<&HashMap<String, String>> for TableProperties {
@@ -256,6 +266,11 @@ impl TryFrom<&HashMap<String, String>> for TableProperties 
{
                 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
                 
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
             )?,
+            gc_enabled: parse_property(
+                props,
+                TableProperties::PROPERTY_GC_ENABLED,
+                TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
+            )?,
         })
     }
 }
@@ -294,6 +309,10 @@ mod tests {
             table_properties.metadata_compression_codec,
             CompressionCodec::None
         );
+        assert_eq!(
+            table_properties.gc_enabled,
+            TableProperties::PROPERTY_GC_ENABLED_DEFAULT
+        );
     }
 
     #[test]
@@ -377,12 +396,17 @@ mod tests {
                 
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
                 "512".to_string(),
             ),
+            (
+                TableProperties::PROPERTY_GC_ENABLED.to_string(),
+                "false".to_string(),
+            ),
         ]);
         let table_properties = TableProperties::try_from(&props).unwrap();
         assert_eq!(table_properties.commit_num_retries, 10);
         assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
         assert_eq!(table_properties.write_format_default, "avro".to_string());
         assert_eq!(table_properties.write_target_file_size_bytes, 512);
+        assert!(!table_properties.gc_enabled);
     }
 
     #[test]
@@ -429,6 +453,17 @@ mod tests {
         assert!(table_properties.to_string().contains(
             "Invalid value for write.target-file-size-bytes: invalid digit 
found in string"
         ));
+
+        let invalid_gc_enabled = HashMap::from([(
+            TableProperties::PROPERTY_GC_ENABLED.to_string(),
+            "notabool".to_string(),
+        )]);
+        let table_properties = 
TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
+        assert!(
+            table_properties
+                .to_string()
+                .contains("Invalid value for gc.enabled")
+        );
     }
 
     #[test]

Reply via email to