This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 416c938c8f [#10750] feat(trino-connector): Support CTAS (CREATE TABLE
AS SELECT) (#10757)
416c938c8f is described below
commit 416c938c8fbae8b1d1164622ca279457594f7ccb
Author: Akshay Thorat <[email protected]>
AuthorDate: Sun May 17 20:24:49 2026 -0700
[#10750] feat(trino-connector): Support CTAS (CREATE TABLE AS SELECT)
(#10757)
### What changes were proposed in this pull request?
Support CTAS (CREATE TABLE AS SELECT) in the Gravitino Trino connector
by
implementing `beginCreateTable()`, `finishCreateTable()`, and the
associated
page sink provider and handle wrapper.
The key design choice is that `beginCreateTable()` first creates the
table in
the Gravitino catalog, then delegates data writing to the internal
connector's
`beginInsert()` path. This avoids double table creation in the original
connector.
**Files changed:**
- `GravitinoMetadata.java` — `beginCreateTable()` creates table via
Gravitino, then calls `internalMetadata.beginInsert()`;
`getNewTableLayout()` delegates to internal
- `GravitinoOutputTableHandle.java` — **new** handle wrapper that wraps
`ConnectorInsertTableHandle` (implements `ConnectorOutputTableHandle`)
- `GravitinoMetadata{435,440,446,452,469,478}.java` — version-specific
`finishCreateTable()` delegating to `finishInsert()`
- `GravitinoPageSinkProvider.java` — `createPageSink(OutputTableHandle)`
unwraps to insert handle and uses insert-path sink
- `JsonCodec.java` — registered `ConnectorOutputTableHandle` in
`AbstractTypedJacksonModule` for polymorphic serialization
### Why are the changes needed?
CTAS is a standard SQL pattern commonly used during lakehouse
migrations, ETL
development, and exploratory data work. Without it, users must use a
two-step
`CREATE TABLE` + `INSERT INTO` pattern, which introduces a failure mode
where
partial table registrations can be left behind if the insert fails.
Fix: #10750
### Does this PR introduce _any_ user-facing change?
Yes. Users can now execute `CREATE TABLE ... AS SELECT` queries through
the
Gravitino Trino connector. Previously this returned:
`This connector does not support creating tables with data.`
### How was this patch tested?
- Added `testCreateTableAsSelect` test in `TestGravitinoConnector`
- All existing tests pass across all 6 Trino SPI version modules
(435–478)
- Ran `./gradlew spotlessApply` for formatting compliance
---------
Co-authored-by: Yuhui <[email protected]>
---
docs/trino-connector/catalog-hive.md | 6 +-
docs/trino-connector/catalog-iceberg.md | 9 +-
docs/trino-connector/catalog-mysql.md | 5 +-
docs/trino-connector/catalog-postgresql.md | 5 +-
docs/trino-connector/sql-support.md | 1 +
.../trino-ci-testset/testsets/hive/00018_ctas.sql | 35 ++++++
.../trino-ci-testset/testsets/hive/00018_ctas.txt | 30 ++++++
.../testsets/jdbc-mysql/00014_ctas.sql | 35 ++++++
.../testsets/jdbc-mysql/00014_ctas.txt | 30 ++++++
.../testsets/jdbc-postgresql/00010_ctas.sql | 35 ++++++
.../testsets/jdbc-postgresql/00010_ctas.txt | 30 ++++++
.../testsets/lakehouse-iceberg/00013_ctas.sql | 57 ++++++++++
.../testsets/lakehouse-iceberg/00013_ctas.txt | 71 +++++++++++++
.../trino/connector/GravitinoMetadata435.java | 12 +++
.../trino/connector/GravitinoMetadata440.java | 13 +++
.../trino/connector/GravitinoMetadata446.java | 13 +++
.../trino/connector/GravitinoMetadata452.java | 13 +++
.../trino/connector/GravitinoMetadata469.java | 13 +++
.../trino/connector/GravitinoMetadata478.java | 13 +++
.../trino/connector/GravitinoMetadata.java | 92 ++++++++++++++++
.../connector/GravitinoOutputTableHandle.java | 102 ++++++++++++++++++
.../trino/connector/GravitinoPageSinkProvider.java | 8 +-
.../connector/catalog/hive/HivePropertyMeta.java | 3 +-
.../trino/connector/util/json/JsonCodec.java | 5 +
.../trino/connector/TestGravitinoConnector.java | 53 +++++++++
.../TestGravitinoMetadataGetNewTableLayout.java | 118 +++++++++++++++++++++
.../catalog/hive/TestHivePropertyMeta.java | 40 +++++++
27 files changed, 839 insertions(+), 8 deletions(-)
diff --git a/docs/trino-connector/catalog-hive.md
b/docs/trino-connector/catalog-hive.md
index 87b391dedf..3836c5782c 100644
--- a/docs/trino-connector/catalog-hive.md
+++ b/docs/trino-connector/catalog-hive.md
@@ -50,7 +50,11 @@ CREATE SCHEMA catalog.schema_name
### Create table
The Gravitino Trino connector currently supports basic Hive table creation
statements, such as defining fields,
-allowing null values, and adding comments. The Gravitino Trino connector does
not support `CREATE TABLE AS SELECT`.
+allowing null values, and adding comments. The Gravitino Trino connector
supports `CREATE TABLE AS SELECT`.
+
+:::note
+`CREATE OR REPLACE TABLE AS SELECT` is not supported. Use `DROP TABLE`
followed by `CREATE TABLE AS SELECT` as an alternative.
+:::
The following example shows how to create a table in the Hive catalog:
diff --git a/docs/trino-connector/catalog-iceberg.md
b/docs/trino-connector/catalog-iceberg.md
index ae9a168c59..df74e8b737 100644
--- a/docs/trino-connector/catalog-iceberg.md
+++ b/docs/trino-connector/catalog-iceberg.md
@@ -33,7 +33,14 @@ CREATE SCHEMA catalog.schema_name
### Create table
The Apache Gravitino Trino connector currently supports basic Iceberg table
creation statements, such as defining fields,
-allowing null values, and adding comments. The Apache Gravitino Trino
connector does not support `CREATE TABLE AS SELECT`.
+allowing null values, and adding comments. The Apache Gravitino Trino
connector supports `CREATE TABLE AS SELECT`.
+
+:::note
+`CREATE OR REPLACE TABLE AS SELECT` is not supported. The Iceberg connector
caches the table's UUID
+at query-plan time; dropping and recreating the table inside the same
transaction causes the
+subsequent insert phase to detect a UUID mismatch and fail. Use `DROP TABLE`
followed by
+`CREATE TABLE AS SELECT` as an alternative.
+:::
The following example shows how to create a table in the Iceberg catalog:
diff --git a/docs/trino-connector/catalog-mysql.md
b/docs/trino-connector/catalog-mysql.md
index 5aa24a1413..df56da9369 100644
--- a/docs/trino-connector/catalog-mysql.md
+++ b/docs/trino-connector/catalog-mysql.md
@@ -17,8 +17,11 @@ To connect to MySQL, you need:
## Create table
At present, the Apache Gravitino Trino connector only supports basic MySQL
table creation statements, which involve fields, null allowances, comments,
primary keys, indexes, default values and auto-increment.
+The Gravitino Trino connector supports `CREATE TABLE AS SELECT`.
-The Gravitino Trino connector does not support `CREATE TABLE AS SELECT`.
+:::note
+`CREATE OR REPLACE TABLE AS SELECT` is not supported. Use `DROP TABLE`
followed by `CREATE TABLE AS SELECT` as an alternative.
+:::
## Alter table
diff --git a/docs/trino-connector/catalog-postgresql.md
b/docs/trino-connector/catalog-postgresql.md
index 15b95bcc42..a9c70c52c4 100644
--- a/docs/trino-connector/catalog-postgresql.md
+++ b/docs/trino-connector/catalog-postgresql.md
@@ -27,8 +27,11 @@ Otherwise, schema names, table names or column names
containing uppercase letter
## Create table
At present, the Apache Gravitino Trino connector only supports basic
PostgreSQL table creation statements, which involve fields, null allowances,
and comments. However, it does not support advanced features like primary keys,
indexes, default values, and auto-increment.
+The Gravitino Trino connector supports `CREATE TABLE AS SELECT`.
-The Gravitino Trino connector does not support `CREATE TABLE AS SELECT`.
+:::note
+`CREATE OR REPLACE TABLE AS SELECT` is not supported. Use `DROP TABLE`
followed by `CREATE TABLE AS SELECT` as an alternative.
+:::
## Alter table
diff --git a/docs/trino-connector/sql-support.md
b/docs/trino-connector/sql-support.md
index 6e97b1de04..77528d887e 100644
--- a/docs/trino-connector/sql-support.md
+++ b/docs/trino-connector/sql-support.md
@@ -36,6 +36,7 @@ The connector provides read access and write access to data
and metadata stored
### Schema and table management
- [CREATE TABLE](https://trino.io/docs/current/sql/create-table.html)
+- [CREATE TABLE AS
SELECT](https://trino.io/docs/current/sql/create-table-as.html) (`CREATE OR
REPLACE TABLE AS SELECT` is not supported)
- [DROP TABLE](https://trino.io/docs/current/sql/drop-table.html)
- [ALTER TABLE](https://trino.io/docs/current/sql/alter-table.html)
- [CREATE SCHEMA](https://trino.io/docs/current/sql/create-schema.html)
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/hive/00018_ctas.sql
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/hive/00018_ctas.sql
new file mode 100644
index 0000000000..ed3ffa77b8
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/hive/00018_ctas.sql
@@ -0,0 +1,35 @@
+CREATE SCHEMA gt_hive.gt_hive_ctas_db;
+
+-- Test 1: Basic CTAS with data
+CREATE TABLE gt_hive.gt_hive_ctas_db.src_table (
+ id integer,
+ name varchar,
+ value decimal(12, 2)
+);
+
+INSERT INTO gt_hive.gt_hive_ctas_db.src_table VALUES (1, 'alice', 10.50), (2,
'bob', 20.75), (3, 'charlie', 30.00);
+
+CREATE TABLE gt_hive.gt_hive_ctas_db.ctas_basic AS SELECT * FROM
gt_hive.gt_hive_ctas_db.src_table;
+
+SELECT * FROM gt_hive.gt_hive_ctas_db.ctas_basic ORDER BY id;
+
+-- Test 2: CTAS with column subset and transformation
+CREATE TABLE gt_hive.gt_hive_ctas_db.ctas_transform AS SELECT id, upper(name)
AS upper_name FROM gt_hive.gt_hive_ctas_db.src_table WHERE id > 1;
+
+SELECT * FROM gt_hive.gt_hive_ctas_db.ctas_transform ORDER BY id;
+
+-- Test 3: CTAS with no data (empty source)
+CREATE TABLE gt_hive.gt_hive_ctas_db.ctas_empty AS SELECT * FROM
gt_hive.gt_hive_ctas_db.src_table WHERE id < 0;
+
+SELECT count(*) FROM gt_hive.gt_hive_ctas_db.ctas_empty;
+
+-- Cleanup
+DROP TABLE gt_hive.gt_hive_ctas_db.ctas_empty;
+
+DROP TABLE gt_hive.gt_hive_ctas_db.ctas_transform;
+
+DROP TABLE gt_hive.gt_hive_ctas_db.ctas_basic;
+
+DROP TABLE gt_hive.gt_hive_ctas_db.src_table;
+
+DROP SCHEMA gt_hive.gt_hive_ctas_db;
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/hive/00018_ctas.txt
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/hive/00018_ctas.txt
new file mode 100644
index 0000000000..43da352215
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/hive/00018_ctas.txt
@@ -0,0 +1,30 @@
+CREATE SCHEMA
+
+CREATE TABLE
+
+INSERT: 3 rows
+
+CREATE TABLE: 3 rows
+
+"1","alice","10.50"
+"2","bob","20.75"
+"3","charlie","30.00"
+
+CREATE TABLE: 2 rows
+
+"2","BOB"
+"3","CHARLIE"
+
+CREATE TABLE: 0 rows
+
+"0"
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP SCHEMA
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00014_ctas.sql
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00014_ctas.sql
new file mode 100644
index 0000000000..e835adfa6b
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00014_ctas.sql
@@ -0,0 +1,35 @@
+CREATE SCHEMA gt_mysql.gt_ctas_db;
+
+-- Test 1: Basic CTAS with data
+CREATE TABLE gt_mysql.gt_ctas_db.src_table (
+ id int,
+ name varchar(200),
+ value decimal(12, 2)
+);
+
+INSERT INTO gt_mysql.gt_ctas_db.src_table VALUES (1, 'alice', 10.50), (2,
'bob', 20.75), (3, 'charlie', 30.00);
+
+CREATE TABLE gt_mysql.gt_ctas_db.ctas_basic AS SELECT * FROM
gt_mysql.gt_ctas_db.src_table;
+
+SELECT * FROM gt_mysql.gt_ctas_db.ctas_basic ORDER BY id;
+
+-- Test 2: CTAS with column subset and transformation
+CREATE TABLE gt_mysql.gt_ctas_db.ctas_transform AS SELECT id, upper(name) AS
upper_name FROM gt_mysql.gt_ctas_db.src_table WHERE id > 1;
+
+SELECT * FROM gt_mysql.gt_ctas_db.ctas_transform ORDER BY id;
+
+-- Test 3: CTAS with no data (empty source)
+CREATE TABLE gt_mysql.gt_ctas_db.ctas_empty AS SELECT * FROM
gt_mysql.gt_ctas_db.src_table WHERE id < 0;
+
+SELECT count(*) FROM gt_mysql.gt_ctas_db.ctas_empty;
+
+-- Cleanup
+DROP TABLE gt_mysql.gt_ctas_db.ctas_empty;
+
+DROP TABLE gt_mysql.gt_ctas_db.ctas_transform;
+
+DROP TABLE gt_mysql.gt_ctas_db.ctas_basic;
+
+DROP TABLE gt_mysql.gt_ctas_db.src_table;
+
+DROP SCHEMA gt_mysql.gt_ctas_db;
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00014_ctas.txt
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00014_ctas.txt
new file mode 100644
index 0000000000..43da352215
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00014_ctas.txt
@@ -0,0 +1,30 @@
+CREATE SCHEMA
+
+CREATE TABLE
+
+INSERT: 3 rows
+
+CREATE TABLE: 3 rows
+
+"1","alice","10.50"
+"2","bob","20.75"
+"3","charlie","30.00"
+
+CREATE TABLE: 2 rows
+
+"2","BOB"
+"3","CHARLIE"
+
+CREATE TABLE: 0 rows
+
+"0"
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP SCHEMA
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00010_ctas.sql
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00010_ctas.sql
new file mode 100644
index 0000000000..595eaf54c4
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00010_ctas.sql
@@ -0,0 +1,35 @@
+CREATE SCHEMA gt_postgresql.gt_ctas_db;
+
+-- Test 1: Basic CTAS with data
+CREATE TABLE gt_postgresql.gt_ctas_db.src_table (
+ id int,
+ name varchar,
+ value decimal(12, 2)
+);
+
+INSERT INTO gt_postgresql.gt_ctas_db.src_table VALUES (1, 'alice', 10.50), (2,
'bob', 20.75), (3, 'charlie', 30.00);
+
+CREATE TABLE gt_postgresql.gt_ctas_db.ctas_basic AS SELECT * FROM
gt_postgresql.gt_ctas_db.src_table;
+
+SELECT * FROM gt_postgresql.gt_ctas_db.ctas_basic ORDER BY id;
+
+-- Test 2: CTAS with column subset and transformation
+CREATE TABLE gt_postgresql.gt_ctas_db.ctas_transform AS SELECT id, upper(name)
AS upper_name FROM gt_postgresql.gt_ctas_db.src_table WHERE id > 1;
+
+SELECT * FROM gt_postgresql.gt_ctas_db.ctas_transform ORDER BY id;
+
+-- Test 3: CTAS with no data (empty source)
+CREATE TABLE gt_postgresql.gt_ctas_db.ctas_empty AS SELECT * FROM
gt_postgresql.gt_ctas_db.src_table WHERE id < 0;
+
+SELECT count(*) FROM gt_postgresql.gt_ctas_db.ctas_empty;
+
+-- Cleanup
+DROP TABLE gt_postgresql.gt_ctas_db.ctas_empty;
+
+DROP TABLE gt_postgresql.gt_ctas_db.ctas_transform;
+
+DROP TABLE gt_postgresql.gt_ctas_db.ctas_basic;
+
+DROP TABLE gt_postgresql.gt_ctas_db.src_table;
+
+DROP SCHEMA gt_postgresql.gt_ctas_db;
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00010_ctas.txt
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00010_ctas.txt
new file mode 100644
index 0000000000..43da352215
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00010_ctas.txt
@@ -0,0 +1,30 @@
+CREATE SCHEMA
+
+CREATE TABLE
+
+INSERT: 3 rows
+
+CREATE TABLE: 3 rows
+
+"1","alice","10.50"
+"2","bob","20.75"
+"3","charlie","30.00"
+
+CREATE TABLE: 2 rows
+
+"2","BOB"
+"3","CHARLIE"
+
+CREATE TABLE: 0 rows
+
+"0"
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP SCHEMA
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_ctas.sql
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_ctas.sql
new file mode 100644
index 0000000000..36908327ea
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_ctas.sql
@@ -0,0 +1,57 @@
+CREATE SCHEMA gt_ctas_db;
+
+-- Test 1: Basic CTAS with data
+CREATE TABLE gt_ctas_db.src_table (
+ id integer,
+ name varchar,
+ value decimal(12, 2)
+);
+
+INSERT INTO gt_ctas_db.src_table VALUES (1, 'alice', 10.50), (2, 'bob',
20.75), (3, 'charlie', 30.00);
+
+CREATE TABLE gt_ctas_db.ctas_basic AS SELECT * FROM gt_ctas_db.src_table;
+
+SELECT * FROM gt_ctas_db.ctas_basic ORDER BY id;
+
+-- Test 2: CTAS with column subset and transformation
+CREATE TABLE gt_ctas_db.ctas_transform AS SELECT id, upper(name) AS upper_name
FROM gt_ctas_db.src_table WHERE id > 1;
+
+SELECT * FROM gt_ctas_db.ctas_transform ORDER BY id;
+
+-- Test 3: CTAS with table properties
+CREATE TABLE gt_ctas_db.ctas_with_props
+WITH (format = 'ORC')
+AS SELECT * FROM gt_ctas_db.src_table;
+
+SELECT * FROM gt_ctas_db.ctas_with_props ORDER BY id;
+
+show create table gt_ctas_db.ctas_with_props;
+
+-- Test 4: CTAS with no data (empty source)
+CREATE TABLE gt_ctas_db.ctas_empty AS SELECT * FROM gt_ctas_db.src_table WHERE
id < 0;
+
+SELECT count(*) FROM gt_ctas_db.ctas_empty;
+
+-- Test 5: CTAS with partitioning
+CREATE TABLE gt_ctas_db.ctas_partitioned
+WITH (partitioning = ARRAY['name'])
+AS SELECT * FROM gt_ctas_db.src_table;
+
+SELECT * FROM gt_ctas_db.ctas_partitioned ORDER BY id;
+
+show create table gt_ctas_db.ctas_partitioned;
+
+-- Cleanup
+DROP TABLE gt_ctas_db.ctas_partitioned;
+
+DROP TABLE gt_ctas_db.ctas_empty;
+
+DROP TABLE gt_ctas_db.ctas_with_props;
+
+DROP TABLE gt_ctas_db.ctas_transform;
+
+DROP TABLE gt_ctas_db.ctas_basic;
+
+DROP TABLE gt_ctas_db.src_table;
+
+DROP SCHEMA gt_ctas_db;
diff --git
a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_ctas.txt
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_ctas.txt
new file mode 100644
index 0000000000..7b22b879f7
--- /dev/null
+++
b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_ctas.txt
@@ -0,0 +1,71 @@
+CREATE SCHEMA
+
+CREATE TABLE
+
+INSERT: 3 rows
+
+CREATE TABLE: 3 rows
+
+"1","alice","10.50"
+"2","bob","20.75"
+"3","charlie","30.00"
+
+CREATE TABLE: 2 rows
+
+"2","BOB"
+"3","CHARLIE"
+
+CREATE TABLE: 3 rows
+
+"1","alice","10.50"
+"2","bob","20.75"
+"3","charlie","30.00"
+
+"CREATE TABLE %.gt_ctas_db.ctas_with_props (
+ id integer,
+ name varchar,
+ value decimal(12, 2)
+)
+COMMENT ''
+WITH (
+ format = 'ORC',
+ format_version = '2',
+ location =
'hdfs://%/user/iceberg/warehouse/TrinoQueryIT/gt_ctas_db%/ctas_with_props'
+)"
+
+CREATE TABLE: 0 rows
+
+"0"
+
+CREATE TABLE: 3 rows
+
+"1","alice","10.50"
+"2","bob","20.75"
+"3","charlie","30.00"
+
+"CREATE TABLE %.gt_ctas_db.ctas_partitioned (
+ id integer,
+ name varchar,
+ value decimal(12, 2)
+)
+COMMENT ''
+WITH (
+ format = 'PARQUET',
+ format_version = '2',
+ location =
'hdfs://%/user/iceberg/warehouse/TrinoQueryIT/gt_ctas_db%/ctas_partitioned',
+ partitioning = ARRAY['name']
+)"
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP TABLE
+
+DROP SCHEMA
diff --git
a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
index fdb584bb24..e6b8ff0db6 100644
---
a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
+++
b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java
@@ -23,6 +23,7 @@ import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -85,6 +86,17 @@ public class GravitinoMetadata435 extends GravitinoMetadata {
session, GravitinoHandle.unWrap(insertHandle), fragments,
computedStatistics);
}
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
+ return internalMetadata.finishInsert(session, insertHandle, fragments,
computedStatistics);
+ }
+
@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode
retryMode) {
diff --git
a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
index 530231cdc9..8734d52378 100644
---
a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
+++
b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java
@@ -23,6 +23,7 @@ import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -92,6 +93,18 @@ public class GravitinoMetadata440 extends GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
+ return internalMetadata.finishInsert(
+ session, insertHandle, List.of(), fragments, computedStatistics);
+ }
+
@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode
retryMode) {
diff --git
a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
index 6dbc31b041..07a871dcd3 100644
---
a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
+++
b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java
@@ -23,6 +23,7 @@ import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -92,6 +93,18 @@ public class GravitinoMetadata446 extends GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
+ return internalMetadata.finishInsert(
+ session, insertHandle, List.of(), fragments, computedStatistics);
+ }
+
@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode
retryMode) {
diff --git
a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
index a36249d71a..31fb9159eb 100644
---
a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
+++
b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java
@@ -23,6 +23,7 @@ import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -69,6 +70,18 @@ public class GravitinoMetadata452 extends GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
+ return internalMetadata.finishInsert(
+ session, insertHandle, List.of(), fragments, computedStatistics);
+ }
+
@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
index ae40f3b58a..b95358e1de 100644
---
a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
+++
b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java
@@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -75,6 +76,18 @@ public class GravitinoMetadata469 extends GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
+ return internalMetadata.finishInsert(
+ session, insertHandle, List.of(), fragments, computedStatistics);
+ }
+
@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
index c67be1527a..261621cdfa 100644
---
a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
+++
b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java
@@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -111,6 +112,18 @@ public class GravitinoMetadata478 extends
GravitinoMetadata {
computedStatistics);
}
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
+ return internalMetadata.finishInsert(
+ session, insertHandle, List.of(), fragments, computedStatistics);
+ }
+
@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index bc37c12d60..04a264e077 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -18,6 +18,7 @@
*/
package org.apache.gravitino.trino.connector;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_COLUMN_NOT_EXISTS;
import static
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_TABLE_NOT_EXISTS;
@@ -33,6 +34,7 @@ import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
@@ -223,6 +225,96 @@ public abstract class GravitinoMetadata implements
ConnectorMetadata {
catalogConnectorMetadata.createTable(table, saveMode == SaveMode.IGNORE);
}
+ @Override
+ public ConnectorOutputTableHandle beginCreateTable(
+ ConnectorSession session,
+ ConnectorTableMetadata tableMetadata,
+ Optional<ConnectorTableLayout> layout,
+ RetryMode retryMode,
+ boolean replace) {
+ // CREATE OR REPLACE TABLE AS SELECT is not supported because the Iceberg
internal connector
+ // caches the table's UUID at query-plan time. When replace=true, we would
need to drop and
+ // recreate the table inside beginCreateTable; however, the subsequent
beginInsert call invokes
+ // beginTransaction -> refresh(), which compares the cached UUID against
the newly created
+ // table's UUID and throws IllegalStateException ("Table UUID does not
match"). There is no
+ // public API in the internal connector to reset this cache, so we reject
replace=true with
+ // NOT_SUPPORTED rather than expose a broken code path.
+ if (replace) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support
replacing a table");
+ }
+
+ SchemaTableName tableName = tableMetadata.getTable();
+
+ // Create the table in the Gravitino catalog
+ GravitinoTable table = metadataAdapter.createTable(tableMetadata);
+ catalogConnectorMetadata.createTable(table, false);
+ try {
+ // Get the table handle from the internal connector for the newly
created table
+ ConnectorTableHandle internalTableHandle =
+ internalMetadata.getTableHandle(session, tableName,
Optional.empty(), Optional.empty());
+ if (internalTableHandle == null) {
+ throw new TrinoException(
+ GRAVITINO_TABLE_NOT_EXISTS,
+ "Internal connector could not find newly created table: " +
tableName);
+ }
+
+ // Build column list in the same order as tableMetadata to preserve
column ordering
+ Map<String, ColumnHandle> internalColumnHandles =
+ internalMetadata.getColumnHandles(session, internalTableHandle);
+ List<ColumnHandle> columns = new
ArrayList<>(tableMetadata.getColumns().size());
+ for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) {
+ ColumnHandle handle =
internalColumnHandles.get(columnMetadata.getName());
+ if (handle == null) {
+ throw new TrinoException(
+ GRAVITINO_COLUMN_NOT_EXISTS,
+ "Column '"
+ + columnMetadata.getName()
+ + "' not found in internal connector for table: "
+ + tableName);
+ }
+ columns.add(handle);
+ }
+
+ // Delegate to the internal connector's insert path to write data,
+ // avoiding double table creation in the original connector
+ ConnectorInsertTableHandle insertTableHandle =
+ internalMetadata.beginInsert(session, internalTableHandle, columns,
retryMode);
+ return new GravitinoOutputTableHandle(insertTableHandle, tableName);
+ } catch (Exception e) {
+ // Clean up the table created in the Gravitino catalog on failure
+ try {
+ catalogConnectorMetadata.dropTable(tableName);
+ } catch (Exception dropException) {
+ LOG.warn("Failed to drop table {} during CTAS cleanup", tableName,
dropException);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Optional<ConnectorTableLayout> getNewTableLayout(
+ ConnectorSession session, ConnectorTableMetadata tableMetadata) {
+ try {
+ return internalMetadata
+ .getNewTableLayout(session, tableMetadata)
+ .map(
+ result ->
+ result.getPartitioning().isPresent()
+ ? new ConnectorTableLayout(
+ new
GravitinoPartitioningHandle(result.getPartitioning().get()),
+ result.getPartitionColumns(),
+ result.supportsMultipleWritersPerPartition())
+ : new
ConnectorTableLayout(result.getPartitionColumns()));
+ } catch (ClassCastException e) {
+ // Property type mismatch between Gravitino's and the internal
connector's metadata
+ // (e.g., Hive 'format' is a String in Gravitino but HiveStorageFormat
enum internally).
+ // Returning empty is correct for non-bucketed CTAS.
+ LOG.debug(
+ "Skipping internal getNewTableLayout due to property type mismatch:
{}", e.getMessage());
+ return Optional.empty();
+ }
+ }
+
@Override
public void createSchema(
ConnectorSession session,
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoOutputTableHandle.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoOutputTableHandle.java
new file mode 100644
index 0000000000..b1fc6f406f
--- /dev/null
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoOutputTableHandle.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.SchemaTableName;
+
+/**
+ * The GravitinoOutputTableHandle is used for handling CTAS (CREATE TABLE AS
SELECT) operations.
+ *
+ * <p>Internally wraps a {@link ConnectorInsertTableHandle} because the
Gravitino connector creates
+ * the table first via catalogConnectorMetadata, then delegates data writing
to the internal
+ * connector's insert path, avoiding double table creation.
+ *
+ * <p>Also stores the {@link SchemaTableName} for table identification during
the CTAS lifecycle.
+ */
+public class GravitinoOutputTableHandle
+ implements ConnectorOutputTableHandle,
GravitinoHandle<ConnectorInsertTableHandle> {
+
+ private final String schemaName;
+ private final String tableName;
+
+ private HandleWrapper<ConnectorInsertTableHandle> handleWrapper =
+ new HandleWrapper<>(ConnectorInsertTableHandle.class);
+
+ /**
+ * Constructs a new GravitinoOutputTableHandle from serialized properties.
+ *
+ * @param handleString the serialized handle string
+ * @param schemaName the schema name of the created table
+ * @param tableName the table name of the created table
+ */
+ @JsonCreator
+ public GravitinoOutputTableHandle(
+ @JsonProperty(HANDLE_STRING) String handleString,
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName) {
+ this.handleWrapper = handleWrapper.fromJson(handleString);
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructs a new GravitinoOutputTableHandle from a
ConnectorInsertTableHandle.
+ *
+ * @param insertTableHandle the internal connector insert table handle
+ * @param tableName the schema-qualified table name
+ */
+ public GravitinoOutputTableHandle(
+ ConnectorInsertTableHandle insertTableHandle, SchemaTableName tableName)
{
+ this.handleWrapper = new HandleWrapper<>(insertTableHandle);
+ this.schemaName = tableName.getSchemaName();
+ this.tableName = tableName.getTableName();
+ }
+
+ @JsonProperty
+ @Override
+ public String getHandleString() {
+ return handleWrapper.toJson();
+ }
+
+ @Override
+ public ConnectorInsertTableHandle getInternalHandle() {
+ return handleWrapper.getHandle();
+ }
+
+ /** Returns the schema name of the table created during CTAS. */
+ @JsonProperty
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ /** Returns the table name of the table created during CTAS. */
+ @JsonProperty
+ public String getTableName() {
+ return tableName;
+ }
+
+ /** Returns the schema-qualified table name. */
+ public SchemaTableName toSchemaTableName() {
+ return new SchemaTableName(schemaName, tableName);
+ }
+}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
index 7fc546ef68..2f8ed0f77c 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java
@@ -28,7 +28,6 @@ import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
-import org.apache.commons.lang3.NotImplementedException;
/** This class provides a ConnectorPageSink for Trino to write data to
internal connector. */
public class GravitinoPageSinkProvider implements ConnectorPageSinkProvider {
@@ -50,7 +49,12 @@ public class GravitinoPageSinkProvider implements
ConnectorPageSinkProvider {
ConnectorSession session,
ConnectorOutputTableHandle outputTableHandle,
ConnectorPageSinkId pageSinkId) {
- throw new NotImplementedException();
+ // GravitinoOutputTableHandle wraps a ConnectorInsertTableHandle
internally,
+ // so delegate to the insert-path createPageSink
+ ConnectorInsertTableHandle insertHandle =
+ ((GravitinoOutputTableHandle) outputTableHandle).getInternalHandle();
+ return pageSinkProvider.createPageSink(
+ GravitinoHandle.unWrap(transactionHandle), session, insertHandle,
pageSinkId);
}
@Override
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java
index d760c96f43..a8afa4466b 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java
@@ -101,8 +101,7 @@ public class HivePropertyMeta implements HasPropertyMeta {
.map(name -> ((String) name).toLowerCase(ENGLISH))
.collect(toImmutableList()),
value -> value),
- integerProperty(
- HIVE_BUCKET_COUNT_KEY, "The number of buckets for the table",
null, false),
+ integerProperty(HIVE_BUCKET_COUNT_KEY, "The number of buckets for
the table", 0, false),
new PropertyMetadata<>(
HIVE_SORT_ORDER_KEY,
"Bucket sorting columns",
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
index 67fe43be03..0566858223 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/JsonCodec.java
@@ -37,6 +37,7 @@ import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
@@ -165,6 +166,10 @@ public class JsonCodec {
new AbstractTypedJacksonModule<>(
ConnectorInsertTableHandle.class, nameResolver, classResolver) {});
+ objectMapper.registerModule(
+ new AbstractTypedJacksonModule<>(
+ ConnectorOutputTableHandle.class, nameResolver, classResolver) {});
+
objectMapper.registerModule(
new AbstractTypedJacksonModule<>(
ConnectorMergeTableHandle.class, nameResolver, classResolver) {});
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
index 710bf2a0ff..fecc1fd9cc 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
@@ -140,6 +140,59 @@ public abstract class TestGravitinoConnector extends
AbstractGravitinoConnectorT
dropTestTable(fullTableName2);
}
+ @Test
+ public void testCreateTableAsSelect() throws Exception {
+ String sourceTable = "\"memory\".db_01.tb_src";
+ String ctasTable = "\"memory\".db_01.tb_ctas";
+
+ createTestTable(sourceTable);
+
+ // Prepare source data
+ assertUpdate(
+ String.format(
+ "insert into %s (a, b) values ('Alice', 1), ('Bob', 2),
('Charlie', 3)", sourceTable),
+ 3);
+
+ // Create table as select (CTAS)
+ assertUpdate(String.format("create table %s as select * from %s",
ctasTable, sourceTable), 3);
+
+ // Verify the CTAS table contains the expected data
+ MaterializedResult result = computeActual("select * from " + ctasTable);
+ assertEquals(result.getRowCount(), 3);
+
+ // Verify schema matches
+ assertThat((String) computeScalar("show create table " + ctasTable))
+ .contains("a varchar")
+ .contains("b integer");
+
+ // Cleanup
+ dropTestTable(ctasTable);
+ dropTestTable(sourceTable);
+ }
+
+ @Test
+ public void testCreateTableAsSelectEmpty() throws Exception {
+ String sourceTable = "\"memory\".db_01.tb_src_empty";
+ String ctasTable = "\"memory\".db_01.tb_ctas_empty";
+
+ createTestTable(sourceTable);
+
+ // Create table as select from an empty source table
+ assertUpdate(String.format("create table %s as select * from %s",
ctasTable, sourceTable), 0);
+
+ // Verify the CTAS table is empty but exists with the correct schema
+ MaterializedResult result = computeActual("select * from " + ctasTable);
+ assertEquals(result.getRowCount(), 0);
+
+ assertThat((String) computeScalar("show create table " + ctasTable))
+ .contains("a varchar")
+ .contains("b integer");
+
+ // Cleanup
+ dropTestTable(ctasTable);
+ dropTestTable(sourceTable);
+ }
+
@Test
public void testAlterTable() throws Exception {
String fullTableName1 = "\"memory\".db_01.tb_01";
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetNewTableLayout.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetNewTableLayout.java
new file mode 100644
index 0000000000..3815eec2fd
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetNewTableLayout.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableLayout;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoMetadataGetNewTableLayout {
+
+ @Test
+ public void testGetNewTableLayoutReturnsEmptyOnClassCastException() {
+ CatalogConnectorMetadata catalogConnectorMetadata =
mock(CatalogConnectorMetadata.class);
+ CatalogConnectorMetadataAdapter metadataAdapter =
mock(CatalogConnectorMetadataAdapter.class);
+ ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ ConnectorTableMetadata tableMetadata =
+ new ConnectorTableMetadata(
+ new SchemaTableName("test_schema", "test_table"),
Collections.emptyList());
+
+ when(internalMetadata.getNewTableLayout(
+ any(ConnectorSession.class), any(ConnectorTableMetadata.class)))
+ .thenThrow(new ClassCastException("String cannot be cast to
HiveStorageFormat"));
+
+ GravitinoMetadata metadata =
+ new StubGravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
+
+ Optional<ConnectorTableLayout> result =
metadata.getNewTableLayout(session, tableMetadata);
+
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testGetNewTableLayoutDelegatesToInternalWhenCompatible() {
+ CatalogConnectorMetadata catalogConnectorMetadata =
mock(CatalogConnectorMetadata.class);
+ CatalogConnectorMetadataAdapter metadataAdapter =
mock(CatalogConnectorMetadataAdapter.class);
+ ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ ConnectorTableMetadata tableMetadata =
+ new ConnectorTableMetadata(
+ new SchemaTableName("test_schema", "test_table"),
Collections.emptyList());
+
+ when(internalMetadata.getNewTableLayout(
+ any(ConnectorSession.class), any(ConnectorTableMetadata.class)))
+ .thenReturn(Optional.empty());
+
+ GravitinoMetadata metadata =
+ new StubGravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
+
+ Optional<ConnectorTableLayout> result =
metadata.getNewTableLayout(session, tableMetadata);
+
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testGetNewTableLayoutReturnsLayoutWithPartitionColumns() {
+ CatalogConnectorMetadata catalogConnectorMetadata =
mock(CatalogConnectorMetadata.class);
+ CatalogConnectorMetadataAdapter metadataAdapter =
mock(CatalogConnectorMetadataAdapter.class);
+ ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ ConnectorTableMetadata tableMetadata =
+ new ConnectorTableMetadata(
+ new SchemaTableName("test_schema", "test_table"),
Collections.emptyList());
+
+ ConnectorTableLayout internalLayout = new
ConnectorTableLayout(Collections.singletonList("id"));
+ when(internalMetadata.getNewTableLayout(
+ any(ConnectorSession.class), any(ConnectorTableMetadata.class)))
+ .thenReturn(Optional.of(internalLayout));
+
+ GravitinoMetadata metadata =
+ new StubGravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
+
+ Optional<ConnectorTableLayout> result =
metadata.getNewTableLayout(session, tableMetadata);
+
+ assertTrue(result.isPresent());
+ }
+
+ private static final class StubGravitinoMetadata extends GravitinoMetadata {
+ private StubGravitinoMetadata(
+ CatalogConnectorMetadata catalogConnectorMetadata,
+ CatalogConnectorMetadataAdapter metadataAdapter,
+ ConnectorMetadata internalMetadata) {
+ super(catalogConnectorMetadata, metadataAdapter, internalMetadata);
+ }
+ }
+}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHivePropertyMeta.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHivePropertyMeta.java
new file mode 100644
index 0000000000..58b889f8ba
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHivePropertyMeta.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.trino.connector.catalog.hive;
+
+import io.trino.spi.session.PropertyMetadata;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestHivePropertyMeta {
+
+ @Test
+ public void testBucketCountDefaultValue() {
+ PropertyMetadata<?> bucketCountProperty =
+ new HivePropertyMeta()
+ .getTablePropertyMetadata().stream()
+ .filter(
+ property ->
property.getName().equals(HivePropertyMeta.HIVE_BUCKET_COUNT_KEY))
+ .findFirst()
+ .orElseThrow();
+
+ Assertions.assertEquals(0, bucketCountProperty.getDefaultValue());
+ }
+}