This is an automated email from the ASF dual-hosted git repository.

diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new b541c33ddd [#10280] Support Iceberg snapshot maintenance procedures 
via Gravitino Trino (#10500)
b541c33ddd is described below

commit b541c33ddd26db34a491994fdc2992ce1fad4b20
Author: Akshay Thorat <[email protected]>
AuthorDate: Tue May 5 18:43:43 2026 -0700

    [#10280] Support Iceberg snapshot maintenance procedures via Gravitino 
Trino (#10500)
    
    ### What changes were proposed in this pull request?
    
    Delegate Iceberg snapshot maintenance procedures (expire_snapshots,
    remove_orphan_files, rewrite_data_files/optimize, rewrite_manifests)
    from the Gravitino Trino Connector to the internal Iceberg connector.
    
    Changes:
    - GravitinoConnector: Add getProcedures() and getTableProcedures()
    - GravitinoMetadata: Add getLayoutForTableExecute(),
    beginTableExecute(), finishTableExecute() in base class
    - Version-specific metadata classes: Add getTableHandleForExecute() and
    executeTableExecute() with correct SPI signatures per Trino version
    - GravitinoPageSinkProvider: Add createPageSink for
    ConnectorTableExecuteHandle
    - Add unit tests (TestGravitinoConnectorProcedures) and integration
    tests
    
    ### Why are the changes needed?
    
    Fix #10280
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    All the tests passed
    
    ---------
    
    Co-authored-by: Yuhui <[email protected]>
---
 docs/trino-connector/catalog-iceberg.md            |  42 +++++
 .../00013_snapshot_maintenance.sql                 |  49 ++++++
 .../00013_snapshot_maintenance.txt                 |  19 +++
 .../trino/connector/GravitinoMetadata435.java      |  25 +++
 .../trino/connector/GravitinoMetadata440.java      |  25 +++
 .../trino/connector/GravitinoMetadata446.java      |  25 +++
 .../trino/connector/GravitinoMetadata452.java      |  25 +++
 .../trino/connector/GravitinoMetadata469.java      |  27 +++
 .../trino/connector/GravitinoMetadata478.java      |  28 +++
 .../trino/connector/GravitinoConnector.java        |  14 ++
 .../trino/connector/GravitinoMetadata.java         |  46 +++++
 .../trino/connector/GravitinoPageSinkProvider.java |  14 ++
 .../connector/GravitinoTableExecuteHandle.java     |  61 +++++++
 .../trino/connector/util/json/JsonCodec.java       |   5 +
 .../TestGravitinoConnectorProcedures.java          | 188 +++++++++++++++++++++
 15 files changed, 593 insertions(+)

diff --git a/docs/trino-connector/catalog-iceberg.md 
b/docs/trino-connector/catalog-iceberg.md
index 20cd0b08f9..ae9a168c59 100644
--- a/docs/trino-connector/catalog-iceberg.md
+++ b/docs/trino-connector/catalog-iceberg.md
@@ -73,6 +73,48 @@ See also [Delete 
limitation](https://trino.io/docs/current/connector/iceberg.htm
 
 `MERGE` is only supported for table using v2 or higher of the Iceberg 
specification.
 
+### Table procedures
+
+The Apache Gravitino Trino connector delegates Iceberg table maintenance 
procedures
+to the underlying Iceberg connector, so they can be invoked via
+`ALTER TABLE ... EXECUTE` on Iceberg tables managed by Gravitino. The following
+procedures are supported:
+
+| Procedure            | Description                                           
                                               |
+|----------------------|------------------------------------------------------------------------------------------------------|
+| `expire_snapshots`   | Remove old snapshots and their associated 
metadata/data files to reclaim storage.                    |
+| `remove_orphan_files`| Remove files in the table's data directory that are 
not referenced by any snapshot.                  |
+| `optimize`           | Rewrite small data files into fewer, larger files to 
improve read performance (a.k.a. `rewrite_data_files`). |
+| `rewrite_manifests`  | Rewrite the table's manifest files to optimize 
metadata scans.                                       |
+
+Example usage:
+
+```sql
+-- Expire snapshots older than the default retention threshold
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE expire_snapshots;
+
+-- Expire snapshots older than 7 days (requires the minimum retention override
+-- to be less than or equal to the requested threshold)
+ALTER TABLE iceberg_test.database_01.table_01
+  EXECUTE expire_snapshots(retention_threshold => '7d');
+
+-- Remove orphan files
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE remove_orphan_files;
+
+-- Compact small data files
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE optimize;
+
+-- Compact small data files whose size is under a threshold
+ALTER TABLE iceberg_test.database_01.table_01
+  EXECUTE optimize(file_size_threshold => '128MB');
+
+-- Rewrite manifests for faster metadata scans
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE rewrite_manifests;
+```
+
+For the full list of parameters accepted by each procedure, see the
+[Trino Iceberg connector 
documentation](https://trino.io/docs/current/connector/iceberg.html#alter-table-execute).
+
 ## Table and Schema properties
 
 ### Create a schema with properties
diff --git 
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql
 
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql
new file mode 100644
index 0000000000..9e8871a717
--- /dev/null
+++ 
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql
@@ -0,0 +1,49 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Test Iceberg snapshot maintenance procedures via Gravitino connector
+
+CREATE SCHEMA IF NOT EXISTS gt_snapshot_test;
+
+CREATE TABLE gt_snapshot_test.maintenance_table (
+    id int,
+    name varchar
+);
+
+-- Insert data to create snapshots
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (1, 'alice');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (2, 'bob');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (3, 'charlie');
+
+-- Verify we have multiple snapshots
+SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots";
+
+-- Test expire_snapshots procedure (delegation to inner Iceberg connector).
+-- Note: remove_orphan_files, optimize, and rewrite_manifests procedures
+-- return non-deterministic statistics rows (file counts vary by run), so
+-- they are covered by Java unit tests instead of this SQL integration test.
+ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots;
+
+-- Verify table data is still intact after expire_snapshots
+SELECT count(*) FROM gt_snapshot_test.maintenance_table;
+
+-- Cleanup
+DROP TABLE gt_snapshot_test.maintenance_table;
+
+DROP SCHEMA gt_snapshot_test;
diff --git 
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt
 
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt
new file mode 100644
index 0000000000..0bc7b82cb2
--- /dev/null
+++ 
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt
@@ -0,0 +1,19 @@
+CREATE SCHEMA
+
+CREATE TABLE
+
+INSERT: 1 row
+
+INSERT: 1 row
+
+INSERT: 1 row
+
+"true"
+
+ALTER TABLE EXECUTE
+
+"3"
+
+DROP TABLE
+
+DROP SCHEMA
diff --git 
a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
 
b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
index 919d53f4a6..fdb584bb24 100644
--- 
a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
+++ 
b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
@@ -24,11 +24,13 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.statistics.ComputedStatistics;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
@@ -50,6 +52,29 @@ public class GravitinoMetadata435 extends GravitinoMetadata {
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata
+        .getTableHandleForExecute(
+            session,
+            GravitinoHandle.unWrap(tableHandle),
+            procedureName,
+            executeProperties,
+            retryMode)
+        .map(GravitinoTableExecuteHandle::new);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
+  }
+
   @Override
   public Optional<ConnectorOutputMetadata> finishInsert(
       ConnectorSession session,
diff --git 
a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
 
b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
index afcf61bd60..530231cdc9 100644
--- 
a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
+++ 
b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
@@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.statistics.ComputedStatistics;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
@@ -52,6 +54,29 @@ public class GravitinoMetadata440 extends GravitinoMetadata {
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata
+        .getTableHandleForExecute(
+            session,
+            GravitinoHandle.unWrap(tableHandle),
+            procedureName,
+            executeProperties,
+            retryMode)
+        .map(GravitinoTableExecuteHandle::new);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
+  }
+
   @Override
   public Optional<ConnectorOutputMetadata> finishInsert(
       ConnectorSession session,
diff --git 
a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
 
b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
index 86345379c9..6dbc31b041 100644
--- 
a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
+++ 
b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
@@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.statistics.ComputedStatistics;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
@@ -52,6 +54,29 @@ public class GravitinoMetadata446 extends GravitinoMetadata {
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata
+        .getTableHandleForExecute(
+            session,
+            GravitinoHandle.unWrap(tableHandle),
+            procedureName,
+            executeProperties,
+            retryMode)
+        .map(GravitinoTableExecuteHandle::new);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
+  }
+
   @Override
   public Optional<ConnectorOutputMetadata> finishInsert(
       ConnectorSession session,
diff --git 
a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
 
b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
index ff43beb290..a36249d71a 100644
--- 
a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
+++ 
b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
@@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.statistics.ComputedStatistics;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
@@ -67,6 +69,29 @@ public class GravitinoMetadata452 extends GravitinoMetadata {
         computedStatistics);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata
+        .getTableHandleForExecute(
+            session,
+            GravitinoHandle.unWrap(tableHandle),
+            procedureName,
+            executeProperties,
+            retryMode)
+        .map(GravitinoTableExecuteHandle::new);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
+  }
+
   @SuppressWarnings("deprecation")
   @Override
   public ConnectorMergeTableHandle beginMerge(
diff --git 
a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
 
b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
index 27307f2036..ae40f3b58a 100644
--- 
a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
+++ 
b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
@@ -22,10 +22,12 @@ import io.airlift.slice.Slice;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
 import io.trino.spi.connector.ColumnPosition;
+import io.trino.spi.connector.ConnectorAccessControl;
 import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.SchemaTableName;
@@ -73,6 +75,31 @@ public class GravitinoMetadata469 extends GravitinoMetadata {
         computedStatistics);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorAccessControl accessControl,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata
+        .getTableHandleForExecute(
+            session,
+            accessControl,
+            GravitinoHandle.unWrap(tableHandle),
+            procedureName,
+            executeProperties,
+            retryMode)
+        .map(GravitinoTableExecuteHandle::new);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
+  }
+
   @Override
   public ConnectorMergeTableHandle beginMerge(
       ConnectorSession session,
diff --git 
a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
 
b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
index 40173cdb42..c67be1527a 100644
--- 
a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
+++ 
b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
@@ -22,10 +22,12 @@ import io.airlift.slice.Slice;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
 import io.trino.spi.connector.ColumnPosition;
+import io.trino.spi.connector.ConnectorAccessControl;
 import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.SchemaTableName;
@@ -66,6 +68,32 @@ public class GravitinoMetadata478 extends GravitinoMetadata {
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn, columnPosition);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorAccessControl accessControl,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata
+        .getTableHandleForExecute(
+            session,
+            accessControl,
+            GravitinoHandle.unWrap(tableHandle),
+            procedureName,
+            executeProperties,
+            retryMode)
+        .map(GravitinoTableExecuteHandle::new);
+  }
+
+  @Override
+  public Map<String, Long> executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    return internalMetadata.executeTableExecute(
+        session, GravitinoHandle.unWrap(tableExecuteHandle));
+  }
+
   @Override
   public Optional<ConnectorOutputMetadata> finishInsert(
       ConnectorSession session,
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
index aa339ed7d6..01d419d1fe 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
@@ -37,6 +37,8 @@ import io.trino.spi.connector.ConnectorRecordSetProvider;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplitManager;
 import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.TableProcedureMetadata;
+import io.trino.spi.procedure.Procedure;
 import io.trino.spi.session.PropertyMetadata;
 import io.trino.spi.transaction.IsolationLevel;
 import java.util.List;
@@ -125,6 +127,18 @@ public class GravitinoConnector implements Connector {
     throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
   }
 
+  @Override
+  public Set<Procedure> getProcedures() {
+    Connector internalConnector = 
catalogConnectorContext.getInternalConnector();
+    return internalConnector.getProcedures();
+  }
+
+  @Override
+  public Set<TableProcedureMetadata> getTableProcedures() {
+    Connector internalConnector = 
catalogConnectorContext.getInternalConnector();
+    return internalConnector.getTableProcedures();
+  }
+
   @Override
   public List<PropertyMetadata<?>> getTableProperties() {
     return catalogConnectorContext.getTableProperties();
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index ca8d1150bd..bc37c12d60 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -23,16 +23,19 @@ import static 
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import io.airlift.slice.Slice;
 import io.trino.spi.TrinoException;
 import io.trino.spi.connector.AggregateFunction;
 import io.trino.spi.connector.AggregationApplicationResult;
 import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.BeginTableExecuteResult;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
 import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMetadata;
 import io.trino.spi.connector.ConnectorPartitioningHandle;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTableLayout;
 import io.trino.spi.connector.ConnectorTableMetadata;
@@ -675,6 +678,49 @@ public abstract class GravitinoMetadata implements 
ConnectorMetadata {
                     : new ConnectorTableLayout(result.getPartitionColumns()));
   }
 
+  @Override
+  public Optional<ConnectorTableLayout> getLayoutForTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    return internalMetadata
+        .getLayoutForTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle))
+        .map(
+            result ->
+                result.getPartitioning().isPresent()
+                    ? new ConnectorTableLayout(
+                        new 
GravitinoPartitioningHandle(result.getPartitioning().get()),
+                        result.getPartitionColumns(),
+                        result.supportsMultipleWritersPerPartition())
+                    : new ConnectorTableLayout(result.getPartitionColumns()));
+  }
+
+  @Override
+  public BeginTableExecuteResult<ConnectorTableExecuteHandle, 
ConnectorTableHandle>
+      beginTableExecute(
+          ConnectorSession session,
+          ConnectorTableExecuteHandle tableExecuteHandle,
+          ConnectorTableHandle updatedSourceTableHandle) {
+    BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> 
result =
+        internalMetadata.beginTableExecute(
+            session,
+            GravitinoHandle.unWrap(tableExecuteHandle),
+            GravitinoHandle.unWrap(updatedSourceTableHandle));
+    SchemaTableName tableName = getTableName(updatedSourceTableHandle);
+    return new BeginTableExecuteResult<>(
+        new GravitinoTableExecuteHandle(result.getTableExecuteHandle()),
+        new GravitinoTableHandle(
+            tableName.getSchemaName(), tableName.getTableName(), 
result.getSourceHandle()));
+  }
+
+  @Override
+  public void finishTableExecute(
+      ConnectorSession session,
+      ConnectorTableExecuteHandle tableExecuteHandle,
+      Collection<Slice> fragments,
+      List<Object> tableExecuteState) {
+    internalMetadata.finishTableExecute(
+        session, GravitinoHandle.unWrap(tableExecuteHandle), fragments, 
tableExecuteState);
+  }
+
   protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
     return ((GravitinoTableHandle) tableHandle).toSchemaTableName();
   }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
index 7122a7f484..7fc546ef68 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
@@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorPageSink;
 import io.trino.spi.connector.ConnectorPageSinkId;
 import io.trino.spi.connector.ConnectorPageSinkProvider;
 import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTransactionHandle;
 import org.apache.commons.lang3.NotImplementedException;
 
@@ -65,6 +66,19 @@ public class GravitinoPageSinkProvider implements 
ConnectorPageSinkProvider {
         pageSinkId);
   }
 
+  @Override
+  public ConnectorPageSink createPageSink(
+      ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session,
+      ConnectorTableExecuteHandle tableExecuteHandle,
+      ConnectorPageSinkId pageSinkId) {
+    return pageSinkProvider.createPageSink(
+        GravitinoHandle.unWrap(transactionHandle),
+        session,
+        GravitinoHandle.unWrap(tableExecuteHandle),
+        pageSinkId);
+  }
+
   @Override
   public ConnectorMergeSink createMergeSink(
       ConnectorTransactionHandle transactionHandle,
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java
new file mode 100644
index 0000000000..2bf93484f6
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
+
+/** The GravitinoTableExecuteHandle is used for handling table execute 
operations. */
+public class GravitinoTableExecuteHandle
+    implements ConnectorTableExecuteHandle, 
GravitinoHandle<ConnectorTableExecuteHandle> {
+
+  private HandleWrapper<ConnectorTableExecuteHandle> handleWrapper =
+      new HandleWrapper<>(ConnectorTableExecuteHandle.class);
+
+  /**
+   * Constructs a new GravitinoTableExecuteHandle from a serialized handle 
string.
+   *
+   * @param handleString the serialized handle string
+   */
+  @JsonCreator
+  public GravitinoTableExecuteHandle(@JsonProperty(HANDLE_STRING) String 
handleString) {
+    this.handleWrapper = handleWrapper.fromJson(handleString);
+  }
+
+  /**
+   * Constructs a new GravitinoTableExecuteHandle from a 
ConnectorTableExecuteHandle.
+   *
+   * @param tableExecuteHandle the internal connector table execute handle
+   */
+  public GravitinoTableExecuteHandle(ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    this.handleWrapper = new HandleWrapper<>(tableExecuteHandle);
+  }
+
+  @JsonProperty
+  @Override
+  public String getHandleString() {
+    return handleWrapper.toJson();
+  }
+
+  @Override
+  public ConnectorTableExecuteHandle getInternalHandle() {
+    return handleWrapper.getHandle();
+  }
+}
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
index 20e6844242..67fe43be03 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
@@ -39,6 +39,7 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
 import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorPartitioningHandle;
 import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTransactionHandle;
 import io.trino.spi.type.StandardTypes;
@@ -171,6 +172,10 @@ public class JsonCodec {
     objectMapper.registerModule(
         new AbstractTypedJacksonModule<>(
             ConnectorPartitioningHandle.class, nameResolver, classResolver) 
{});
+
+    objectMapper.registerModule(
+        new AbstractTypedJacksonModule<>(
+            ConnectorTableExecuteHandle.class, nameResolver, classResolver) 
{});
   }
 
   @SuppressWarnings("deprecation")
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java
new file mode 100644
index 0000000000..2a426a56e1
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.trino.spi.connector.BeginTableExecuteResult;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.TableProcedureMetadata;
+import io.trino.spi.procedure.Procedure;
+import java.util.List;
+import java.util.Set;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that verify the Gravitino Trino connector properly delegates 
procedure-related operations
+ * to the internal connector, enabling support for Iceberg snapshot 
maintenance procedures.
+ */
+public class TestGravitinoConnectorProcedures {
+
+  @Test
+  void testGetProceduresDelegatesToInternalConnector() {
+    Connector mockInternalConnector = mock(Connector.class);
+    Set<Procedure> expectedProcedures = Set.of(mock(Procedure.class), 
mock(Procedure.class));
+    when(mockInternalConnector.getProcedures()).thenReturn(expectedProcedures);
+
+    GravitinoConnector connector = createConnector(mockInternalConnector);
+    Set<Procedure> result = connector.getProcedures();
+
+    assertEquals(expectedProcedures, result);
+    verify(mockInternalConnector).getProcedures();
+  }
+
+  @Test
+  void testGetProceduresReturnsEmptySetWhenNoProcedures() {
+    Connector mockInternalConnector = mock(Connector.class);
+    when(mockInternalConnector.getProcedures()).thenReturn(Set.of());
+
+    GravitinoConnector connector = createConnector(mockInternalConnector);
+    Set<Procedure> result = connector.getProcedures();
+
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void testGetTableProceduresDelegatesToInternalConnector() {
+    Connector mockInternalConnector = mock(Connector.class);
+    Set<TableProcedureMetadata> expectedProcedures = 
Set.of(mock(TableProcedureMetadata.class));
+    
when(mockInternalConnector.getTableProcedures()).thenReturn(expectedProcedures);
+
+    GravitinoConnector connector = createConnector(mockInternalConnector);
+    Set<TableProcedureMetadata> result = connector.getTableProcedures();
+
+    assertEquals(expectedProcedures, result);
+    verify(mockInternalConnector).getTableProcedures();
+  }
+
+  @Test
+  void testBeginTableExecuteUnwrapsAndWrapsHandles() {
+    ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+    ConnectorSession session = mock(ConnectorSession.class);
+    ConnectorTableHandle internalTableHandle = 
mock(ConnectorTableHandle.class);
+    ConnectorTableExecuteHandle executeHandle = 
mock(ConnectorTableExecuteHandle.class);
+    ConnectorTableHandle resultSourceHandle = mock(ConnectorTableHandle.class);
+    ConnectorTableExecuteHandle resultExecuteHandle = 
mock(ConnectorTableExecuteHandle.class);
+
+    GravitinoTableHandle gravitinoTableHandle =
+        new GravitinoTableHandle("test_schema", "test_table", 
internalTableHandle);
+    GravitinoTableExecuteHandle wrappedExecuteHandle =
+        new GravitinoTableExecuteHandle(executeHandle);
+
+    BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> 
internalResult =
+        new BeginTableExecuteResult<>(resultExecuteHandle, resultSourceHandle);
+
+    when(internalMetadata.beginTableExecute(session, executeHandle, 
internalTableHandle))
+        .thenReturn(internalResult);
+
+    GravitinoMetadata metadata = createMetadata(internalMetadata);
+
+    BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> 
result =
+        metadata.beginTableExecute(session, wrappedExecuteHandle, 
gravitinoTableHandle);
+
+    assertTrue(result.getTableExecuteHandle() instanceof 
GravitinoTableExecuteHandle);
+    assertSame(
+        resultExecuteHandle,
+        ((GravitinoTableExecuteHandle) 
result.getTableExecuteHandle()).getInternalHandle());
+    assertTrue(result.getSourceHandle() instanceof GravitinoTableHandle);
+    GravitinoTableHandle wrappedSource = (GravitinoTableHandle) 
result.getSourceHandle();
+    assertEquals("test_schema", wrappedSource.getSchemaName());
+    assertEquals("test_table", wrappedSource.getTableName());
+    assertSame(resultSourceHandle, wrappedSource.getInternalHandle());
+  }
+
+  @Test
+  void testFinishTableExecuteDelegatesToInternalMetadata() {
+    ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+    ConnectorSession session = mock(ConnectorSession.class);
+    ConnectorTableExecuteHandle internalExecuteHandle = 
mock(ConnectorTableExecuteHandle.class);
+    GravitinoTableExecuteHandle wrappedHandle =
+        new GravitinoTableExecuteHandle(internalExecuteHandle);
+
+    GravitinoMetadata metadata = createMetadata(internalMetadata);
+    metadata.finishTableExecute(session, wrappedHandle, List.of(), List.of());
+
+    verify(internalMetadata)
+        .finishTableExecute(eq(session), eq(internalExecuteHandle), any(), 
any());
+  }
+
+  private GravitinoConnector createConnector(Connector internalConnector) {
+    GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+    
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", 
"catalog"));
+
+    GravitinoMetalake metalake = mockMetalake();
+
+    CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class);
+    when(mockContext.getCatalog()).thenReturn(mockCatalog);
+    when(mockContext.getMetalake()).thenReturn(metalake);
+    when(mockContext.getInternalConnector()).thenReturn(internalConnector);
+    // Stub getConfig() to support the forward-user logic added in 
GravitinoConnector
+    // (see apache/gravitino PR #10730). Uses lenient() so tests still pass on 
branches
+    // where the constructor does not yet call getConfig().
+    
lenient().when(mockContext.getConfig()).thenReturn(mock(GravitinoConfig.class));
+
+    return new GravitinoConnector(mockContext);
+  }
+
+  private GravitinoMetadata createMetadata(ConnectorMetadata internalMetadata) 
{
+    CatalogConnectorMetadata catalogConnectorMetadata = 
mock(CatalogConnectorMetadata.class);
+    CatalogConnectorMetadataAdapter metadataAdapter = 
mock(CatalogConnectorMetadataAdapter.class);
+    return new TestGravitinoMetadataImpl(
+        catalogConnectorMetadata, metadataAdapter, internalMetadata);
+  }
+
+  private static GravitinoMetalake mockMetalake() {
+    GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+    Catalog catalog = mock(Catalog.class);
+    when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+    when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+    when(metalake.loadCatalog(any())).thenReturn(catalog);
+    return metalake;
+  }
+
+  private static final class TestGravitinoMetadataImpl extends 
GravitinoMetadata {
+    private TestGravitinoMetadataImpl(
+        CatalogConnectorMetadata catalogConnectorMetadata,
+        CatalogConnectorMetadataAdapter metadataAdapter,
+        ConnectorMetadata internalMetadata) {
+      super(catalogConnectorMetadata, metadataAdapter, internalMetadata);
+    }
+  }
+}


Reply via email to