This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new b541c33ddd [#10280] Support Iceberg snapshot maintenance procedures
via Gravitino Trino (#10500)
b541c33ddd is described below
commit b541c33ddd26db34a491994fdc2992ce1fad4b20
Author: Akshay Thorat <[email protected]>
AuthorDate: Tue May 5 18:43:43 2026 -0700
[#10280] Support Iceberg snapshot maintenance procedures via Gravitino
Trino (#10500)
### What changes were proposed in this pull request?
Delegate Iceberg snapshot maintenance procedures (expire_snapshots,
remove_orphan_files, rewrite_data_files/optimize, rewrite_manifests)
from the Gravitino Trino Connector to the internal Iceberg connector.
Changes:
- GravitinoConnector: Add getProcedures() and getTableProcedures()
- GravitinoMetadata: Add getLayoutForTableExecute(),
beginTableExecute(), finishTableExecute() in base class
- Version-specific metadata classes: Add getTableHandleForExecute() and
executeTableExecute() with correct SPI signatures per Trino version
- GravitinoPageSinkProvider: Add createPageSink for
ConnectorTableExecuteHandle
- Add unit tests (TestGravitinoConnectorProcedures) and integration
tests
### Why are the changes needed?
Fix #10280
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
All the tests passed
---------
Co-authored-by: Yuhui <[email protected]>
---
docs/trino-connector/catalog-iceberg.md | 42 +++++
.../00013_snapshot_maintenance.sql | 49 ++++++
.../00013_snapshot_maintenance.txt | 19 +++
.../trino/connector/GravitinoMetadata435.java | 25 +++
.../trino/connector/GravitinoMetadata440.java | 25 +++
.../trino/connector/GravitinoMetadata446.java | 25 +++
.../trino/connector/GravitinoMetadata452.java | 25 +++
.../trino/connector/GravitinoMetadata469.java | 27 +++
.../trino/connector/GravitinoMetadata478.java | 28 +++
.../trino/connector/GravitinoConnector.java | 14 ++
.../trino/connector/GravitinoMetadata.java | 46 +++++
.../trino/connector/GravitinoPageSinkProvider.java | 14 ++
.../connector/GravitinoTableExecuteHandle.java | 61 +++++++
.../trino/connector/util/json/JsonCodec.java | 5 +
.../TestGravitinoConnectorProcedures.java | 188 +++++++++++++++++++++
15 files changed, 593 insertions(+)
diff --git a/docs/trino-connector/catalog-iceberg.md
b/docs/trino-connector/catalog-iceberg.md
index 20cd0b08f9..ae9a168c59 100644
--- a/docs/trino-connector/catalog-iceberg.md
+++ b/docs/trino-connector/catalog-iceberg.md
@@ -73,6 +73,48 @@ See also [Delete
limitation](https://trino.io/docs/current/connector/iceberg.htm
`MERGE` is only supported for table using v2 or higher of the Iceberg
specification.
+### Table procedures
+
+The Apache Gravitino Trino connector delegates Iceberg table maintenance
procedures
+to the underlying Iceberg connector, so they can be invoked via
+`ALTER TABLE ... EXECUTE` on Iceberg tables managed by Gravitino. The following
+procedures are supported:
+
+| Procedure | Description
|
+|----------------------|------------------------------------------------------------------------------------------------------|
+| `expire_snapshots` | Remove old snapshots and their associated
metadata/data files to reclaim storage. |
+| `remove_orphan_files`| Remove files in the table's data directory that are
not referenced by any snapshot. |
+| `optimize` | Rewrite small data files into fewer, larger files to
improve read performance (a.k.a. `rewrite_data_files`). |
+| `rewrite_manifests` | Rewrite the table's manifest files to optimize
metadata scans. |
+
+Example usage:
+
+```sql
+-- Expire snapshots older than the default retention threshold
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE expire_snapshots;
+
+-- Expire snapshots older than 7 days (requires the minimum retention override
+-- to be less than or equal to the requested threshold)
+ALTER TABLE iceberg_test.database_01.table_01
+ EXECUTE expire_snapshots(retention_threshold => '7d');
+
+-- Remove orphan files
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE remove_orphan_files;
+
+-- Compact small data files
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE optimize;
+
+-- Compact small data files whose size is under a threshold
+ALTER TABLE iceberg_test.database_01.table_01
+ EXECUTE optimize(file_size_threshold => '128MB');
+
+-- Rewrite manifests for faster metadata scans
+ALTER TABLE iceberg_test.database_01.table_01 EXECUTE rewrite_manifests;
+```
+
+For the full list of parameters accepted by each procedure, see the
+[Trino Iceberg connector
documentation](https://trino.io/docs/current/connector/iceberg.html#alter-table-execute).
+
## Table and Schema properties
### Create a schema with properties
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql
new file mode 100644
index 0000000000..9e8871a717
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql
@@ -0,0 +1,49 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Test Iceberg snapshot maintenance procedures via Gravitino connector
+
+CREATE SCHEMA IF NOT EXISTS gt_snapshot_test;
+
+CREATE TABLE gt_snapshot_test.maintenance_table (
+ id int,
+ name varchar
+);
+
+-- Insert data to create snapshots
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (1, 'alice');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (2, 'bob');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (3, 'charlie');
+
+-- Verify we have multiple snapshots
+SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots";
+
+-- Test expire_snapshots procedure (delegation to inner Iceberg connector).
+-- Note: remove_orphan_files, optimize, and rewrite_manifests procedures
+-- return non-deterministic statistics rows (file counts vary by run), so
+-- they are covered by Java unit tests instead of this SQL integration test.
+ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots;
+
+-- Verify table data is still intact after expire_snapshots
+SELECT count(*) FROM gt_snapshot_test.maintenance_table;
+
+-- Cleanup
+DROP TABLE gt_snapshot_test.maintenance_table;
+
+DROP SCHEMA gt_snapshot_test;
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt
new file mode 100644
index 0000000000..0bc7b82cb2
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt
@@ -0,0 +1,19 @@
+CREATE SCHEMA
+
+CREATE TABLE
+
+INSERT: 1 row
+
+INSERT: 1 row
+
+INSERT: 1 row
+
+"true"
+
+ALTER TABLE EXECUTE
+
+"3"
+
+DROP TABLE
+
+DROP SCHEMA
diff --git
a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
index 919d53f4a6..fdb584bb24 100644
---
a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
+++
b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
@@ -24,11 +24,13 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.statistics.ComputedStatistics;
import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
@@ -50,6 +52,29 @@ public class GravitinoMetadata435 extends GravitinoMetadata {
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata
+ .getTableHandleForExecute(
+ session,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode)
+ .map(GravitinoTableExecuteHandle::new);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
+ }
+
@Override
public Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
index afcf61bd60..530231cdc9 100644
---
a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
+++
b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
@@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.statistics.ComputedStatistics;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
@@ -52,6 +54,29 @@ public class GravitinoMetadata440 extends GravitinoMetadata {
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata
+ .getTableHandleForExecute(
+ session,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode)
+ .map(GravitinoTableExecuteHandle::new);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
+ }
+
@Override
public Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
index 86345379c9..6dbc31b041 100644
---
a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
+++
b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
@@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.statistics.ComputedStatistics;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
@@ -52,6 +54,29 @@ public class GravitinoMetadata446 extends GravitinoMetadata {
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata
+ .getTableHandleForExecute(
+ session,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode)
+ .map(GravitinoTableExecuteHandle::new);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
+ }
+
@Override
public Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
index ff43beb290..a36249d71a 100644
---
a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
+++
b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
@@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.statistics.ComputedStatistics;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
@@ -67,6 +69,29 @@ public class GravitinoMetadata452 extends GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata
+ .getTableHandleForExecute(
+ session,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode)
+ .map(GravitinoTableExecuteHandle::new);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
+ }
+
@SuppressWarnings("deprecation")
@Override
public ConnectorMergeTableHandle beginMerge(
diff --git
a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
index 27307f2036..ae40f3b58a 100644
---
a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
+++
b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
@@ -22,10 +22,12 @@ import io.airlift.slice.Slice;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnPosition;
+import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
@@ -73,6 +75,31 @@ public class GravitinoMetadata469 extends GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorAccessControl accessControl,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata
+ .getTableHandleForExecute(
+ session,
+ accessControl,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode)
+ .map(GravitinoTableExecuteHandle::new);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
+ }
+
@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
index 40173cdb42..c67be1527a 100644
---
a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
+++
b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
@@ -22,10 +22,12 @@ import io.airlift.slice.Slice;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnPosition;
+import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
@@ -66,6 +68,32 @@ public class GravitinoMetadata478 extends GravitinoMetadata {
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn, columnPosition);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorAccessControl accessControl,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata
+ .getTableHandleForExecute(
+ session,
+ accessControl,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode)
+ .map(GravitinoTableExecuteHandle::new);
+ }
+
+ @Override
+ public Map<String, Long> executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ return internalMetadata.executeTableExecute(
+ session, GravitinoHandle.unWrap(tableExecuteHandle));
+ }
+
@Override
public Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
index aa339ed7d6..01d419d1fe 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
@@ -37,6 +37,8 @@ import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.TableProcedureMetadata;
+import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;
import java.util.List;
@@ -125,6 +127,18 @@ public class GravitinoConnector implements Connector {
throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
}
+ @Override
+ public Set<Procedure> getProcedures() {
+ Connector internalConnector =
catalogConnectorContext.getInternalConnector();
+ return internalConnector.getProcedures();
+ }
+
+ @Override
+ public Set<TableProcedureMetadata> getTableProcedures() {
+ Connector internalConnector =
catalogConnectorContext.getInternalConnector();
+ return internalConnector.getTableProcedures();
+ }
+
@Override
public List<PropertyMetadata<?>> getTableProperties() {
return catalogConnectorContext.getTableProperties();
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index ca8d1150bd..bc37c12d60 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -23,16 +23,19 @@ import static
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
@@ -675,6 +678,49 @@ public abstract class GravitinoMetadata implements
ConnectorMetadata {
: new ConnectorTableLayout(result.getPartitionColumns()));
}
+ @Override
+ public Optional<ConnectorTableLayout> getLayoutForTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ return internalMetadata
+ .getLayoutForTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle))
+ .map(
+ result ->
+ result.getPartitioning().isPresent()
+ ? new ConnectorTableLayout(
+ new
GravitinoPartitioningHandle(result.getPartitioning().get()),
+ result.getPartitionColumns(),
+ result.supportsMultipleWritersPerPartition())
+ : new ConnectorTableLayout(result.getPartitionColumns()));
+ }
+
+ @Override
+ public BeginTableExecuteResult<ConnectorTableExecuteHandle,
ConnectorTableHandle>
+ beginTableExecute(
+ ConnectorSession session,
+ ConnectorTableExecuteHandle tableExecuteHandle,
+ ConnectorTableHandle updatedSourceTableHandle) {
+ BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle>
result =
+ internalMetadata.beginTableExecute(
+ session,
+ GravitinoHandle.unWrap(tableExecuteHandle),
+ GravitinoHandle.unWrap(updatedSourceTableHandle));
+ SchemaTableName tableName = getTableName(updatedSourceTableHandle);
+ return new BeginTableExecuteResult<>(
+ new GravitinoTableExecuteHandle(result.getTableExecuteHandle()),
+ new GravitinoTableHandle(
+ tableName.getSchemaName(), tableName.getTableName(),
result.getSourceHandle()));
+ }
+
+ @Override
+ public void finishTableExecute(
+ ConnectorSession session,
+ ConnectorTableExecuteHandle tableExecuteHandle,
+ Collection<Slice> fragments,
+ List<Object> tableExecuteState) {
+ internalMetadata.finishTableExecute(
+ session, GravitinoHandle.unWrap(tableExecuteHandle), fragments,
tableExecuteState);
+ }
+
protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
return ((GravitinoTableHandle) tableHandle).toSchemaTableName();
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
index 7122a7f484..7fc546ef68 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
@@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import org.apache.commons.lang3.NotImplementedException;
@@ -65,6 +66,19 @@ public class GravitinoPageSinkProvider implements
ConnectorPageSinkProvider {
pageSinkId);
}
+ @Override
+ public ConnectorPageSink createPageSink(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorTableExecuteHandle tableExecuteHandle,
+ ConnectorPageSinkId pageSinkId) {
+ return pageSinkProvider.createPageSink(
+ GravitinoHandle.unWrap(transactionHandle),
+ session,
+ GravitinoHandle.unWrap(tableExecuteHandle),
+ pageSinkId);
+ }
+
@Override
public ConnectorMergeSink createMergeSink(
ConnectorTransactionHandle transactionHandle,
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java
new file mode 100644
index 0000000000..2bf93484f6
--- /dev/null
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
+
+/** The GravitinoTableExecuteHandle is used for handling table execute
operations. */
+public class GravitinoTableExecuteHandle
+ implements ConnectorTableExecuteHandle,
GravitinoHandle<ConnectorTableExecuteHandle> {
+
+ private HandleWrapper<ConnectorTableExecuteHandle> handleWrapper =
+ new HandleWrapper<>(ConnectorTableExecuteHandle.class);
+
+ /**
+ * Constructs a new GravitinoTableExecuteHandle from a serialized handle
string.
+ *
+ * @param handleString the serialized handle string
+ */
+ @JsonCreator
+ public GravitinoTableExecuteHandle(@JsonProperty(HANDLE_STRING) String
handleString) {
+ this.handleWrapper = handleWrapper.fromJson(handleString);
+ }
+
+ /**
+ * Constructs a new GravitinoTableExecuteHandle from a
ConnectorTableExecuteHandle.
+ *
+ * @param tableExecuteHandle the internal connector table execute handle
+ */
+ public GravitinoTableExecuteHandle(ConnectorTableExecuteHandle
tableExecuteHandle) {
+ this.handleWrapper = new HandleWrapper<>(tableExecuteHandle);
+ }
+
+ @JsonProperty
+ @Override
+ public String getHandleString() {
+ return handleWrapper.toJson();
+ }
+
+ @Override
+ public ConnectorTableExecuteHandle getInternalHandle() {
+ return handleWrapper.getHandle();
+ }
+}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
index 20e6844242..67fe43be03 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
@@ -39,6 +39,7 @@ import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.StandardTypes;
@@ -171,6 +172,10 @@ public class JsonCodec {
objectMapper.registerModule(
new AbstractTypedJacksonModule<>(
ConnectorPartitioningHandle.class, nameResolver, classResolver)
{});
+
+ objectMapper.registerModule(
+ new AbstractTypedJacksonModule<>(
+ ConnectorTableExecuteHandle.class, nameResolver, classResolver)
{});
}
@SuppressWarnings("deprecation")
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java
new file mode 100644
index 0000000000..2a426a56e1
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.trino.spi.connector.BeginTableExecuteResult;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableExecuteHandle;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.TableProcedureMetadata;
+import io.trino.spi.procedure.Procedure;
+import java.util.List;
+import java.util.Set;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that verify the Gravitino Trino connector properly delegates
procedure-related operations
+ * to the internal connector, enabling support for Iceberg snapshot
maintenance procedures.
+ */
+public class TestGravitinoConnectorProcedures {
+
+ @Test
+ void testGetProceduresDelegatesToInternalConnector() {
+ Connector mockInternalConnector = mock(Connector.class);
+ Set<Procedure> expectedProcedures = Set.of(mock(Procedure.class),
mock(Procedure.class));
+ when(mockInternalConnector.getProcedures()).thenReturn(expectedProcedures);
+
+ GravitinoConnector connector = createConnector(mockInternalConnector);
+ Set<Procedure> result = connector.getProcedures();
+
+ assertEquals(expectedProcedures, result);
+ verify(mockInternalConnector).getProcedures();
+ }
+
+ @Test
+ void testGetProceduresReturnsEmptySetWhenNoProcedures() {
+ Connector mockInternalConnector = mock(Connector.class);
+ when(mockInternalConnector.getProcedures()).thenReturn(Set.of());
+
+ GravitinoConnector connector = createConnector(mockInternalConnector);
+ Set<Procedure> result = connector.getProcedures();
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testGetTableProceduresDelegatesToInternalConnector() {
+ Connector mockInternalConnector = mock(Connector.class);
+ Set<TableProcedureMetadata> expectedProcedures =
Set.of(mock(TableProcedureMetadata.class));
+
when(mockInternalConnector.getTableProcedures()).thenReturn(expectedProcedures);
+
+ GravitinoConnector connector = createConnector(mockInternalConnector);
+ Set<TableProcedureMetadata> result = connector.getTableProcedures();
+
+ assertEquals(expectedProcedures, result);
+ verify(mockInternalConnector).getTableProcedures();
+ }
+
+ @Test
+ void testBeginTableExecuteUnwrapsAndWrapsHandles() {
+ ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+ ConnectorSession session = mock(ConnectorSession.class);
+ ConnectorTableHandle internalTableHandle =
mock(ConnectorTableHandle.class);
+ ConnectorTableExecuteHandle executeHandle =
mock(ConnectorTableExecuteHandle.class);
+ ConnectorTableHandle resultSourceHandle = mock(ConnectorTableHandle.class);
+ ConnectorTableExecuteHandle resultExecuteHandle =
mock(ConnectorTableExecuteHandle.class);
+
+ GravitinoTableHandle gravitinoTableHandle =
+ new GravitinoTableHandle("test_schema", "test_table",
internalTableHandle);
+ GravitinoTableExecuteHandle wrappedExecuteHandle =
+ new GravitinoTableExecuteHandle(executeHandle);
+
+ BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle>
internalResult =
+ new BeginTableExecuteResult<>(resultExecuteHandle, resultSourceHandle);
+
+ when(internalMetadata.beginTableExecute(session, executeHandle,
internalTableHandle))
+ .thenReturn(internalResult);
+
+ GravitinoMetadata metadata = createMetadata(internalMetadata);
+
+ BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle>
result =
+ metadata.beginTableExecute(session, wrappedExecuteHandle,
gravitinoTableHandle);
+
+ assertTrue(result.getTableExecuteHandle() instanceof
GravitinoTableExecuteHandle);
+ assertSame(
+ resultExecuteHandle,
+ ((GravitinoTableExecuteHandle)
result.getTableExecuteHandle()).getInternalHandle());
+ assertTrue(result.getSourceHandle() instanceof GravitinoTableHandle);
+ GravitinoTableHandle wrappedSource = (GravitinoTableHandle)
result.getSourceHandle();
+ assertEquals("test_schema", wrappedSource.getSchemaName());
+ assertEquals("test_table", wrappedSource.getTableName());
+ assertSame(resultSourceHandle, wrappedSource.getInternalHandle());
+ }
+
+ @Test
+ void testFinishTableExecuteDelegatesToInternalMetadata() {
+ ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+ ConnectorSession session = mock(ConnectorSession.class);
+ ConnectorTableExecuteHandle internalExecuteHandle =
mock(ConnectorTableExecuteHandle.class);
+ GravitinoTableExecuteHandle wrappedHandle =
+ new GravitinoTableExecuteHandle(internalExecuteHandle);
+
+ GravitinoMetadata metadata = createMetadata(internalMetadata);
+ metadata.finishTableExecute(session, wrappedHandle, List.of(), List.of());
+
+ verify(internalMetadata)
+ .finishTableExecute(eq(session), eq(internalExecuteHandle), any(),
any());
+ }
+
+ private GravitinoConnector createConnector(Connector internalConnector) {
+ GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake",
"catalog"));
+
+ GravitinoMetalake metalake = mockMetalake();
+
+ CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class);
+ when(mockContext.getCatalog()).thenReturn(mockCatalog);
+ when(mockContext.getMetalake()).thenReturn(metalake);
+ when(mockContext.getInternalConnector()).thenReturn(internalConnector);
+ // Stub getConfig() to support the forward-user logic added in
GravitinoConnector
+ // (see apache/gravitino PR #10730). Uses lenient() so tests still pass on
branches
+ // where the constructor does not yet call getConfig().
+
lenient().when(mockContext.getConfig()).thenReturn(mock(GravitinoConfig.class));
+
+ return new GravitinoConnector(mockContext);
+ }
+
+ private GravitinoMetadata createMetadata(ConnectorMetadata internalMetadata)
{
+ CatalogConnectorMetadata catalogConnectorMetadata =
mock(CatalogConnectorMetadata.class);
+ CatalogConnectorMetadataAdapter metadataAdapter =
mock(CatalogConnectorMetadataAdapter.class);
+ return new TestGravitinoMetadataImpl(
+ catalogConnectorMetadata, metadataAdapter, internalMetadata);
+ }
+
+ private static GravitinoMetalake mockMetalake() {
+ GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+ Catalog catalog = mock(Catalog.class);
+ when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+ when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+ when(metalake.loadCatalog(any())).thenReturn(catalog);
+ return metalake;
+ }
+
+ private static final class TestGravitinoMetadataImpl extends
GravitinoMetadata {
+ private TestGravitinoMetadataImpl(
+ CatalogConnectorMetadata catalogConnectorMetadata,
+ CatalogConnectorMetadataAdapter metadataAdapter,
+ ConnectorMetadata internalMetadata) {
+ super(catalogConnectorMetadata, metadataAdapter, internalMetadata);
+ }
+ }
+}