Copilot commented on code in PR #9678:
URL: https://github.com/apache/gravitino/pull/9678#discussion_r2781095605


##########
docs/lakehouse-generic-delta-table.md:
##########
@@ -0,0 +1,390 @@
+---
+title: "Delta Lake table support"
+slug: /delta-table-support
+keywords:
+- lakehouse
+- delta
+- delta lake
+- metadata
+- generic catalog
+license: "This software is licensed under the Apache License version 2."
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+## Overview
+
+This document describes how to use Apache Gravitino to manage a generic 
lakehouse catalog using Delta Lake as the underlying table format. Gravitino 
supports registering and managing metadata for external Delta tables.
+
+:::info Current Support
+Gravitino currently supports **external Delta tables only**. This means:
+- You can register existing Delta tables in Gravitino
+- Gravitino manages metadata only (schema, location, properties)
+- The physical Delta table data remains independent
+- Dropping tables from Gravitino does not delete the underlying Delta data
+:::
+
+## Table Management
+
+### Supported Operations
+
+For Delta tables in a Generic Lakehouse Catalog, the following table 
summarizes supported operations:
+
+| Operation | Support Status                                 |
+|-----------|------------------------------------------------|
+| List      | ✅ Full                                         |
+| Load      | ✅ Full                                         |
+| Alter     | ❌ Not supported (use Delta Lake APIs directly) |
+| Create    | ✅ Register external tables only                |
+| Drop      | ✅ Metadata only (data preserved)               |
+| Purge     | ❌ Not supported for external tables            |
+
+:::note Feature Limitations
+- **External Tables Only:** Must set `external=true` when creating Delta tables
+- **Alter Operations:** Not supported; modify tables using Delta Lake APIs or 
Spark, then update Gravitino metadata if needed
+- **Purge:** Not applicable for external tables; use DROP to remove metadata 
only
+- **Partitioning:** Identity partitions supported as metadata only; user must 
ensure consistency with actual Delta table
+- **Sort Orders:** Not supported in CREATE TABLE
+- **Distributions:** Not supported in CREATE TABLE
+- **Indexes:** Not supported in CREATE TABLE
+:::
+
+### Data Type Mappings
+
+Delta Lake uses Apache Spark data types. The following table shows type 
mappings between Gravitino and Delta/Spark:
+
+| Gravitino Type      | Delta/Spark Type       | Notes                         
  |
+|---------------------|------------------------|---------------------------------|
+| `Boolean`           | `BooleanType`          |                               
  |
+| `Byte`              | `ByteType`             |                               
  |
+| `Short`             | `ShortType`            |                               
  |
+| `Integer`           | `IntegerType`          |                               
  |
+| `Long`              | `LongType`             |                               
  |
+| `Float`             | `FloatType`            |                               
  |
+| `Double`            | `DoubleType`           |                               
  |
+| `Decimal(p, s)`     | `DecimalType(p, s)`    |                               
  |
+| `String`            | `StringType`           |                               
  |
+| `Binary`            | `BinaryType`           |                               
  |
+| `Date`              | `DateType`             |                               
  |
+| `Timestamp`         | `TimestampNTZType`     | No timezone, Spark 3.4+       
  |
+| `Timestamp_tz`      | `TimestampType`        | With timezone                 
  |
+| `List`              | `ArrayType`            |                               
  |
+| `Map`               | `MapType`              |                               
  |
+| `Struct`            | `StructType`           |                               
  |
+
+### Table Properties
+
+Required and optional properties for Delta tables in a Generic Lakehouse 
Catalog:
+
+| Property   | Description                                                     
                                                                                
                                                                       | 
Default | Required | Since Version |
+|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|----------|---------------|
+| `format`   | Table format: must be `delta`                                   
                                                                                
                                                                       | (none) 
 | Yes      | 1.2.0         |
+| `location` | Storage path for the Delta table. Must point to a directory 
containing Delta Lake metadata (_delta_log). Supports file://, s3://, hdfs://, 
abfs://, gs://, and other Hadoop-compatible file systems.                  | 
(none)  | Yes      | 1.2.0         |
+| `external` | Must be `true` for Delta tables. Indicates that Gravitino 
manages metadata only <br/>and will not delete physical data when the table is 
dropped.                                                                        
   | (none)  | Yes      | 1.2.0         |
+
+**Location Requirement:** Must be specified at table level for external Delta 
table. See [Location 
Resolution](./lakehouse-generic-catalog.md#key-property-location).

Review Comment:
   This statement says the `location` must be specified at the *table* level 
for external Delta tables, but the generic lakehouse catalog resolves location 
from table/schema/catalog properties (and may auto-concatenate the table name) 
when not provided at the table level. Please clarify the doc to match actual 
resolution behavior, or enforce a table-level `location` requirement for Delta 
external tables in code.
   ```suggestion
   **Location Requirement:** Every external Delta table must have a resolvable 
location, which may be provided via table, schema, or catalog properties; when 
not set at the table level, the Generic Lakehouse Catalog applies its standard 
[Location Resolution](./lakehouse-generic-catalog.md#key-property-location) 
semantics (including possible auto-concatenation of the table name).
   ```



##########
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/TestDeltaTableOperations.java:
##########
@@ -0,0 +1,303 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.IdGenerator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestDeltaTableOperations {
+
+  @TempDir private java.nio.file.Path tempDir;
+
+  private DeltaTableOperations deltaTableOps;
+  private EntityStore store;
+  private ManagedSchemaOperations schemaOps;
+  private IdGenerator idGenerator;
+
+  @BeforeEach
+  public void setUp() {
+    store = mock(EntityStore.class);
+    schemaOps = mock(ManagedSchemaOperations.class);
+    idGenerator = mock(IdGenerator.class);
+    deltaTableOps = spy(new DeltaTableOperations(store, schemaOps, 
idGenerator));
+  }

Review Comment:
   `deltaTableOps` is created as a Mockito `spy`, but the test file doesn't 
stub/verify any behavior on the spy. Using a plain instance (no spy) would 
simplify the setup and avoid confusion about partial mocking.



##########
docs/lakehouse-generic-delta-table.md:
##########
@@ -0,0 +1,390 @@
+---
+title: "Delta Lake table support"
+slug: /delta-table-support
+keywords:
+- lakehouse
+- delta
+- delta lake
+- metadata
+- generic catalog
+license: "This software is licensed under the Apache License version 2."
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+## Overview
+
+This document describes how to use Apache Gravitino to manage a generic 
lakehouse catalog using Delta Lake as the underlying table format. Gravitino 
supports registering and managing metadata for external Delta tables.
+
+:::info Current Support
+Gravitino currently supports **external Delta tables only**. This means:
+- You can register existing Delta tables in Gravitino
+- Gravitino manages metadata only (schema, location, properties)
+- The physical Delta table data remains independent
+- Dropping tables from Gravitino does not delete the underlying Delta data
+:::
+
+## Table Management
+
+### Supported Operations
+
+For Delta tables in a Generic Lakehouse Catalog, the following table 
summarizes supported operations:
+
+| Operation | Support Status                                 |
+|-----------|------------------------------------------------|
+| List      | ✅ Full                                         |
+| Load      | ✅ Full                                         |
+| Alter     | ❌ Not supported (use Delta Lake APIs directly) |
+| Create    | ✅ Register external tables only                |
+| Drop      | ✅ Metadata only (data preserved)               |
+| Purge     | ❌ Not supported for external tables            |
+

Review Comment:
   The markdown tables here use a double leading pipe (e.g., `|| Operation | 
...`). Standard Markdown/Docusaurus tables expect a single leading pipe (`| 
Operation | ...`); the current formatting will render as an extra empty column 
or may not render correctly. Please adjust the table rows to use single pipes 
consistently.



##########
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/integration/test/CatalogGenericCatalogDeltaIT.java:
##########
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta.integration.test;
+
+import com.google.common.collect.Maps;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.delta.DeltaConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for Delta table support in Gravitino generic lakehouse 
catalog.
+ *
+ * <p>These tests verify:
+ *
+ * <ul>
+ *   <li>Creating a physical Delta table using Delta Kernel
+ *   <li>Registering the Delta table in Gravitino catalog
+ *   <li>Loading table metadata from Gravitino
+ *   <li>Reading actual Delta table using location from Gravitino metadata
+ *   <li>Verifying table still exists after dropping from Gravitino 
(metadata-only drop)
+ * </ul>
+ */
+public class CatalogGenericCatalogDeltaIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogGenericCatalogDeltaIT.class);
+  public static final String METALAKE_NAME =
+      GravitinoITUtils.genRandomName("CatalogGenericDeltaIT_metalake");
+
+  public String catalogName = 
GravitinoITUtils.genRandomName("CatalogGenericDeltaIT_catalog");
+  public String SCHEMA_PREFIX = "CatalogGenericDelta_schema";
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  public String TABLE_PREFIX = "CatalogGenericDelta_table";
+  public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+  public static final String TABLE_COMMENT = "Delta table comment";
+  public static final String COL_NAME1 = "id";
+  public static final String COL_NAME2 = "name";
+  protected final String provider = "lakehouse-generic";
+  protected GravitinoMetalake metalake;
+  protected Catalog catalog;
+  protected String tempDirectory;
+  protected Engine deltaEngine;
+
+  @BeforeAll
+  public void startup() throws Exception {
+    createMetalake();
+    createCatalog();
+    createSchema();
+
+    Path tempDir = Files.createTempDirectory("deltaTempDir");
+    tempDirectory = tempDir.toString();
+
+    deltaEngine = DefaultEngine.create(new Configuration());
+  }
+
+  @AfterAll
+  public void stop() throws IOException {
+    if (client != null) {
+      Arrays.stream(catalog.asSchemas().listSchemas())
+          .filter(schema -> !schema.equals("default"))
+          .forEach(
+              (schema -> {
+                catalog.asSchemas().dropSchema(schema, true);
+              }));
+      Arrays.stream(metalake.listCatalogs())
+          .forEach(
+              catalogName -> {
+                metalake.dropCatalog(catalogName, true);
+              });
+      client.dropMetalake(METALAKE_NAME, true);
+    }
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close CloseableGroup", e);
+    }
+
+    client = null;
+
+    FileUtils.deleteDirectory(new File(tempDirectory));
+  }
+
+  @AfterEach
+  public void resetSchema() throws InterruptedException {
+    catalog.asSchemas().dropSchema(schemaName, true);
+    createSchema();
+  }
+
+  @Test
+  public void testCreateDeltaTableAndRegisterToGravitino() throws Exception {
+    String tableLocation = tempDirectory + "/" + tableName;
+
+    // Step 1: Create a physical Delta table using Delta Kernel
+    StructType schema =
+        new StructType().add("id", IntegerType.INTEGER, true).add("name", 
StringType.STRING, true);
+
+    TransactionBuilder txnBuilder =
+        io.delta.kernel.Table.forPath(deltaEngine, tableLocation)
+            .createTransactionBuilder(deltaEngine, "test", 
Operation.CREATE_TABLE);
+
+    txnBuilder
+        .withSchema(deltaEngine, schema)
+        .withPartitionColumns(deltaEngine, Collections.emptyList())
+        .build(deltaEngine)
+        .commit(deltaEngine, emptyRowIterable());
+
+    LOG.info("Created Delta table at: {}", tableLocation);
+
+    // Step 2: Register the Delta table in Gravitino catalog
+    Column[] gravitinoColumns =
+        new Column[] {
+          Column.of(COL_NAME1, Types.IntegerType.get(), "id column"),
+          Column.of(COL_NAME2, Types.StringType.get(), "name column")
+        };
+
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Table gravitinoTable =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier,
+                gravitinoColumns,
+                TABLE_COMMENT,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+
+    Assertions.assertEquals(tableName, gravitinoTable.name());
+    Assertions.assertEquals(TABLE_COMMENT, gravitinoTable.comment());
+    LOG.info("Registered Delta table in Gravitino catalog");
+
+    // Step 3: Load table metadata from Gravitino
+    Table loadedTable = catalog.asTableCatalog().loadTable(nameIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+    Assertions.assertEquals(2, loadedTable.columns().length);
+
+    // Note: Gravitino may normalize the location by adding trailing slash
+    String locationFromMetadata = 
loadedTable.properties().get(Table.PROPERTY_LOCATION);
+    Assertions.assertTrue(
+        locationFromMetadata.equals(tableLocation)
+            || locationFromMetadata.equals(tableLocation + "/"),
+        "Location should match with or without trailing slash");
+
+    // Step 4: Use the location from Gravitino metadata to read actual Delta 
table
+    Assertions.assertNotNull(locationFromMetadata);
+
+    // Read Delta table using Delta Kernel
+    io.delta.kernel.Table deltaTable =
+        io.delta.kernel.Table.forPath(deltaEngine, locationFromMetadata);
+    Snapshot snapshot = deltaTable.getLatestSnapshot(deltaEngine);
+    Assertions.assertNotNull(snapshot);
+
+    StructType deltaSchema = snapshot.getSchema(deltaEngine);
+    Assertions.assertEquals(2, deltaSchema.fields().size());
+    Assertions.assertEquals(COL_NAME1, deltaSchema.fields().get(0).getName());
+    Assertions.assertEquals(COL_NAME2, deltaSchema.fields().get(1).getName());
+
+    // Step 5: Drop table from Gravitino catalog (metadata only)
+    boolean dropped = catalog.asTableCatalog().dropTable(nameIdentifier);
+    Assertions.assertTrue(dropped);
+
+    // Step 6: Verify Delta table still exists at location and can be accessed
+    io.delta.kernel.Table deltaTableAfterDrop =
+        io.delta.kernel.Table.forPath(deltaEngine, locationFromMetadata);
+    Snapshot snapshotAfterDrop = 
deltaTableAfterDrop.getLatestSnapshot(deltaEngine);
+    Assertions.assertNotNull(snapshotAfterDrop);
+    Assertions.assertEquals(2, 
snapshotAfterDrop.getSchema(deltaEngine).fields().size());
+  }
+
+  @Test
+  public void testCreateDeltaTableWithoutExternalFails() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        Transforms.EMPTY_TRANSFORM,
+                        null,
+                        null));

Review Comment:
   Using `assertThrows(Exception.class, ...)` is overly broad and can hide 
unexpected failures (e.g., runtime errors unrelated to the validation being 
tested). Since the Delta external constraint is enforced via argument 
validation, assert the specific exception type (typically 
`IllegalArgumentException`) and, if possible, assert the full message or a 
stable substring.



##########
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/integration/test/CatalogGenericCatalogDeltaIT.java:
##########
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta.integration.test;
+
+import com.google.common.collect.Maps;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.delta.DeltaConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for Delta table support in Gravitino generic lakehouse 
catalog.
+ *
+ * <p>These tests verify:
+ *
+ * <ul>
+ *   <li>Creating a physical Delta table using Delta Kernel
+ *   <li>Registering the Delta table in Gravitino catalog
+ *   <li>Loading table metadata from Gravitino
+ *   <li>Reading actual Delta table using location from Gravitino metadata
+ *   <li>Verifying table still exists after dropping from Gravitino 
(metadata-only drop)
+ * </ul>
+ */
+public class CatalogGenericCatalogDeltaIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogGenericCatalogDeltaIT.class);
+  public static final String METALAKE_NAME =
+      GravitinoITUtils.genRandomName("CatalogGenericDeltaIT_metalake");
+
+  public String catalogName = 
GravitinoITUtils.genRandomName("CatalogGenericDeltaIT_catalog");
+  public String SCHEMA_PREFIX = "CatalogGenericDelta_schema";
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  public String TABLE_PREFIX = "CatalogGenericDelta_table";
+  public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+  public static final String TABLE_COMMENT = "Delta table comment";
+  public static final String COL_NAME1 = "id";
+  public static final String COL_NAME2 = "name";
+  protected final String provider = "lakehouse-generic";
+  protected GravitinoMetalake metalake;
+  protected Catalog catalog;
+  protected String tempDirectory;
+  protected Engine deltaEngine;
+
+  @BeforeAll
+  public void startup() throws Exception {
+    createMetalake();
+    createCatalog();
+    createSchema();
+
+    Path tempDir = Files.createTempDirectory("deltaTempDir");
+    tempDirectory = tempDir.toString();
+
+    deltaEngine = DefaultEngine.create(new Configuration());
+  }
+
+  @AfterAll
+  public void stop() throws IOException {
+    if (client != null) {
+      Arrays.stream(catalog.asSchemas().listSchemas())
+          .filter(schema -> !schema.equals("default"))
+          .forEach(
+              (schema -> {
+                catalog.asSchemas().dropSchema(schema, true);
+              }));
+      Arrays.stream(metalake.listCatalogs())
+          .forEach(
+              catalogName -> {
+                metalake.dropCatalog(catalogName, true);
+              });
+      client.dropMetalake(METALAKE_NAME, true);
+    }
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close CloseableGroup", e);
+    }
+
+    client = null;
+
+    FileUtils.deleteDirectory(new File(tempDirectory));
+  }
+
+  @AfterEach
+  public void resetSchema() throws InterruptedException {
+    catalog.asSchemas().dropSchema(schemaName, true);
+    createSchema();
+  }
+
+  @Test
+  public void testCreateDeltaTableAndRegisterToGravitino() throws Exception {
+    String tableLocation = tempDirectory + "/" + tableName;
+
+    // Step 1: Create a physical Delta table using Delta Kernel
+    StructType schema =
+        new StructType().add("id", IntegerType.INTEGER, true).add("name", 
StringType.STRING, true);
+
+    TransactionBuilder txnBuilder =
+        io.delta.kernel.Table.forPath(deltaEngine, tableLocation)
+            .createTransactionBuilder(deltaEngine, "test", 
Operation.CREATE_TABLE);
+
+    txnBuilder
+        .withSchema(deltaEngine, schema)
+        .withPartitionColumns(deltaEngine, Collections.emptyList())
+        .build(deltaEngine)
+        .commit(deltaEngine, emptyRowIterable());
+
+    LOG.info("Created Delta table at: {}", tableLocation);
+
+    // Step 2: Register the Delta table in Gravitino catalog
+    Column[] gravitinoColumns =
+        new Column[] {
+          Column.of(COL_NAME1, Types.IntegerType.get(), "id column"),
+          Column.of(COL_NAME2, Types.StringType.get(), "name column")
+        };
+
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Table gravitinoTable =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier,
+                gravitinoColumns,
+                TABLE_COMMENT,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+
+    Assertions.assertEquals(tableName, gravitinoTable.name());
+    Assertions.assertEquals(TABLE_COMMENT, gravitinoTable.comment());
+    LOG.info("Registered Delta table in Gravitino catalog");
+
+    // Step 3: Load table metadata from Gravitino
+    Table loadedTable = catalog.asTableCatalog().loadTable(nameIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+    Assertions.assertEquals(2, loadedTable.columns().length);
+
+    // Note: Gravitino may normalize the location by adding trailing slash
+    String locationFromMetadata = 
loadedTable.properties().get(Table.PROPERTY_LOCATION);
+    Assertions.assertTrue(
+        locationFromMetadata.equals(tableLocation)
+            || locationFromMetadata.equals(tableLocation + "/"),
+        "Location should match with or without trailing slash");
+
+    // Step 4: Use the location from Gravitino metadata to read actual Delta 
table
+    Assertions.assertNotNull(locationFromMetadata);
+
+    // Read Delta table using Delta Kernel
+    io.delta.kernel.Table deltaTable =
+        io.delta.kernel.Table.forPath(deltaEngine, locationFromMetadata);
+    Snapshot snapshot = deltaTable.getLatestSnapshot(deltaEngine);
+    Assertions.assertNotNull(snapshot);
+
+    StructType deltaSchema = snapshot.getSchema(deltaEngine);
+    Assertions.assertEquals(2, deltaSchema.fields().size());
+    Assertions.assertEquals(COL_NAME1, deltaSchema.fields().get(0).getName());
+    Assertions.assertEquals(COL_NAME2, deltaSchema.fields().get(1).getName());
+
+    // Step 5: Drop table from Gravitino catalog (metadata only)
+    boolean dropped = catalog.asTableCatalog().dropTable(nameIdentifier);
+    Assertions.assertTrue(dropped);
+
+    // Step 6: Verify Delta table still exists at location and can be accessed
+    io.delta.kernel.Table deltaTableAfterDrop =
+        io.delta.kernel.Table.forPath(deltaEngine, locationFromMetadata);
+    Snapshot snapshotAfterDrop = 
deltaTableAfterDrop.getLatestSnapshot(deltaEngine);
+    Assertions.assertNotNull(snapshotAfterDrop);
+    Assertions.assertEquals(2, 
snapshotAfterDrop.getSchema(deltaEngine).fields().size());
+  }
+
+  @Test
+  public void testCreateDeltaTableWithoutExternalFails() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        Transforms.EMPTY_TRANSFORM,
+                        null,
+                        null));
+
+    Assertions.assertTrue(exception.getMessage().contains("external Delta 
tables"));
+  }
+
+  @Test
+  public void testCreateDeltaTableWithoutLocationFails() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        Transforms.EMPTY_TRANSFORM,
+                        null,
+                        null));
+

Review Comment:
   Same issue here: `assertThrows(Exception.class, ...)` is too broad for a 
validation test and may allow unrelated exceptions to satisfy the assertion. 
Prefer asserting `IllegalArgumentException` (or the concrete expected type) to 
make the test signal more reliable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to