This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 419f72e9ab [#9647]feat(catalog-lakehouse-generic): Add external Delta 
table support for generic lakehouse catalog (#9678)
419f72e9ab is described below

commit 419f72e9abbd909d98c74a386d2a2d490e8ababd
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Feb 10 14:06:20 2026 +0800

    [#9647]feat(catalog-lakehouse-generic): Add external Delta table support 
for generic lakehouse catalog (#9678)
    
    ### What changes were proposed in this pull request?
    
    This commit implements support for external Delta Lake tables in the
    generic lakehouse catalog, allowing users to register and manage
    metadata for existing Delta tables.
    
    Features:
    - External Delta table registration and metadata management
    - Schema stored from user CREATE TABLE request
    - Metadata-only DROP operation (preserves physical data)
    - Comprehensive validation and error messages
    - Integration with Delta Kernel 3.3.0 for table creation
    
    Implementation:
    - DeltaConstants: Delta table format constant
    - DeltaTableDelegator: ServiceLoader integration for Delta format
    - DeltaTableOperations: Table lifecycle operations (242 lines)
      * CREATE: Requires external=true and location properties
      * LOAD: Retrieves metadata from entity store
      * DROP: Removes metadata only, preserves data
      * ALTER: Not supported (throws UnsupportedOperationException)
      * PURGE: Not supported for external tables
    
    Testing:
    - 4 unit tests in TestDeltaTableOperations
    - 7 integration tests in CatalogGenericCatalogDeltaIT
      * Physical Delta table creation with Delta Kernel
      * Registration and metadata operations
      * Validation of external-only constraint
      * Verification of metadata-only drop behavior
    
    Documentation:
    - lakehouse-generic-delta-table.md: Complete user guide
      * Table operations and examples
      * Data type mappings
      * Troubleshooting and best practices
      * Integration with Spark and Delta Lake APIs
    
    Dependencies:
    - Delta Kernel API 3.3.0 and defaults
    - Hadoop 3.x for Configuration
    
    Limitations:
    - External tables only (managed tables require Delta 4.0
    CommitCoordinator)
    - ALTER not supported (use Delta Lake APIs directly)
    - Schema validation not enforced (user responsibility)
    - Partitioning informational only (managed by Delta log)
    
    ### Why are the changes needed?
    
    Fix: #9647
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT and IT added.
---
 .../main/java/org/apache/gravitino/rel/Table.java  |   1 +
 .../catalog-lakehouse-generic/build.gradle.kts     |   3 +
 .../catalog/lakehouse/delta/DeltaConstants.java    |  35 ++
 .../lakehouse/delta/DeltaTableDelegator.java       |  76 +++
 .../lakehouse/delta/DeltaTableOperations.java      | 268 +++++++++
 .../generic/GenericTablePropertiesMetadata.java    |  15 +-
 ...talog.lakehouse.generic.LakehouseTableDelegator |   1 +
 .../lakehouse/delta/TestDeltaTableOperations.java  | 302 ++++++++++
 .../test/CatalogGenericCatalogDeltaIT.java         | 631 +++++++++++++++++++++
 docs/lakehouse-generic-delta-table.md              | 390 +++++++++++++
 gradle/libs.versions.toml                          |   3 +
 11 files changed, 1723 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/rel/Table.java 
b/api/src/main/java/org/apache/gravitino/rel/Table.java
index d9b95c5210..1e1f5f63fb 100644
--- a/api/src/main/java/org/apache/gravitino/rel/Table.java
+++ b/api/src/main/java/org/apache/gravitino/rel/Table.java
@@ -57,6 +57,7 @@ public interface Table extends Auditable {
    *
    * <ul>
    *   <li>lance
+   *   <li>Delta
    * </ul>
    */
   String PROPERTY_TABLE_FORMAT = "format";
diff --git a/catalogs/catalog-lakehouse-generic/build.gradle.kts 
b/catalogs/catalog-lakehouse-generic/build.gradle.kts
index a3d68b8451..e2e686f1cb 100644
--- a/catalogs/catalog-lakehouse-generic/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-generic/build.gradle.kts
@@ -54,6 +54,9 @@ dependencies {
   testImplementation(project(":server-common"))
 
   testImplementation(libs.awaitility)
+  testImplementation(libs.delta.kernel)
+  testImplementation(libs.delta.kernel.defaults)
+  testImplementation(libs.hadoop3.common)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.lance) // Included in the test runtime classpath for 
test only
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaConstants.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaConstants.java
new file mode 100644
index 0000000000..2a5599dab6
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaConstants.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta;
+
+/**
+ * Constants for Delta Lake table format support in Gravitino lakehouse 
catalog.
+ *
+ * <p>This class defines constants used for managing external Delta tables, 
including table format
+ * identifiers.
+ */
+public class DeltaConstants {
+
+  /** The table format identifier for Delta Lake tables. */
+  public static final String DELTA_TABLE_FORMAT = "delta";
+
+  private DeltaConstants() {
+    // Prevent instantiation
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaTableDelegator.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaTableDelegator.java
new file mode 100644
index 0000000000..4f9f592b52
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaTableDelegator.java
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta;
+
+import static 
org.apache.gravitino.catalog.lakehouse.delta.DeltaConstants.DELTA_TABLE_FORMAT;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * Delegator for Delta Lake table format in Gravitino lakehouse catalog.
+ *
+ * <p>This delegator provides table operations specific to external Delta Lake 
tables. It enables
+ * Gravitino to register and manage metadata for existing Delta tables without 
creating or modifying
+ * the underlying Delta table data.
+ *
+ * <p>Key features:
+ *
+ * <ul>
+ *   <li>Supports external Delta table registration (requires {@code 
external=true} property)
+ *   <li>Schema is provided by user in CREATE TABLE request (not read from 
Delta log)
+ *   <li>Uses standard table properties ({@code location}, {@code external}) - 
no Delta-specific
+ *       properties needed
+ *   <li>Drop operations only remove metadata, preserving the Delta table data
+ *   <li>No ALTER TABLE or PURGE support (external tables should be modified 
directly)
+ * </ul>
+ */
+public class DeltaTableDelegator implements LakehouseTableDelegator {
+
+  @Override
+  public String tableFormat() {
+    return DELTA_TABLE_FORMAT;
+  }
+
+  /**
+   * Returns Delta-specific table property entries.
+   *
+   * <p>Delta tables use standard table properties ({@link 
Table#PROPERTY_LOCATION} and {@link
+   * Table#PROPERTY_EXTERNAL}), so no Delta-specific properties are needed.
+   *
+   * @return an empty list since all required properties are standard table 
properties
+   */
+  @Override
+  public List<PropertyEntry<?>> tablePropertyEntries() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ManagedTableOperations createTableOps(
+      EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator 
idGenerator) {
+    return new DeltaTableOperations(store, schemaOps, idGenerator);
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaTableOperations.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaTableOperations.java
new file mode 100644
index 0000000000..f2b0f2ec55
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/delta/DeltaTableOperations.java
@@ -0,0 +1,268 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * Table operations for Delta Lake tables in Gravitino lakehouse catalog.
+ *
+ * <p>This class handles the lifecycle of external Delta Lake table metadata 
in Gravitino. It
+ * focuses on metadata management only and does not interact with the actual 
Delta table data or
+ * Delta log files.
+ *
+ * <p><b>Supported Operations:</b>
+ *
+ * <ul>
+ *   <li><b>Create Table:</b> Registers an external Delta table by storing its 
schema and location
+ *       in Gravitino's metadata store. Requires {@code external=true} 
property.
+ *   <li><b>Load Table:</b> Retrieves table metadata from Gravitino's metadata 
store
+ *   <li><b>Drop Table:</b> Removes metadata only, preserving the physical 
Delta table data
+ * </ul>
+ *
+ * <p><b>Unsupported Operations:</b>
+ *
+ * <ul>
+ *   <li><b>Alter Table:</b> Not supported; users should modify the Delta 
table directly using Delta
+ *       Lake APIs, then optionally recreate the catalog entry with updated 
schema
+ *   <li><b>Purge Table:</b> Not supported for external tables; data lifecycle 
is managed externally
+ * </ul>
+ *
+ * <p><b>Design Decisions:</b>
+ *
+ * <ul>
+ *   <li>Only supports external tables ({@code external=true} must be 
explicitly set)
+ *   <li>Schema comes from CREATE TABLE request (not validated against Delta 
log)
+ *   <li>User is responsible for ensuring schema accuracy matches the actual 
Delta table
+ *   <li>Supports identity partitioning as metadata only (user must ensure it 
matches actual Delta
+ *       table)
+ *   <li>Non-identity transforms (bucket, truncate, etc.), distribution, sort 
orders, and indexes
+ *       are not supported
+ * </ul>
+ */
+public class DeltaTableOperations extends ManagedTableOperations {
+
+  private final EntityStore store;
+  private final ManagedSchemaOperations schemaOps;
+  private final IdGenerator idGenerator;
+
+  public DeltaTableOperations(
+      EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator 
idGenerator) {
+    this.store = store;
+    this.schemaOps = schemaOps;
+    this.idGenerator = idGenerator;
+  }
+
+  @Override
+  protected EntityStore store() {
+    return store;
+  }
+
+  @Override
+  protected SupportsSchemas schemas() {
+    return schemaOps;
+  }
+
+  @Override
+  protected IdGenerator idGenerator() {
+    return idGenerator;
+  }
+
+  /**
+   * Creates an external Delta table by registering its metadata in Gravitino.
+   *
+   * <p>This method validates that the table is explicitly marked as external 
and has a valid
+   * location, then stores the metadata in Gravitino's entity store. It does 
not create or modify
+   * the actual Delta table data.
+   *
+   * <p><b>Required Properties:</b>
+   *
+   * <ul>
+   *   <li>{@code external=true} - Must be explicitly set to create external 
Delta tables
+   *   <li>{@code location} - Storage path of the existing Delta table
+   * </ul>
+   *
+   * <p><b>Supported Partitioning:</b>
+   *
+   * <ul>
+   *   <li>Identity transforms only (e.g., {@code 
Transforms.identity("column_name")})
+   *   <li>Partition information is stored as metadata only
+   *   <li>User is responsible for ensuring partition metadata matches the 
actual Delta table
+   *   <li>Non-identity transforms (bucket, truncate, year, month, etc.) are 
not supported
+   * </ul>
+   *
+   * <p><b>Disallowed Parameters:</b>
+   *
+   * <ul>
+   *   <li>Distribution - Not applicable for external Delta tables
+   *   <li>Sort orders - Not applicable for external Delta tables
+   *   <li>Indexes - Not applicable for external Delta tables
+   * </ul>
+   *
+   * @param ident the table identifier
+   * @param columns the table columns (schema provided by user)
+   * @param comment the table comment
+   * @param properties the table properties (must include {@code 
external=true} and {@code
+   *     location})
+   * @param partitions the partitioning (optional, identity transforms only)
+   * @param distribution the distribution (must be NONE or null)
+   * @param sortOrders the sort orders (must be empty or null)
+   * @param indexes the indexes (must be empty or null)
+   * @return the created table metadata
+   * @throws NoSuchSchemaException if the schema does not exist
+   * @throws TableAlreadyExistsException if the table already exists
+   * @throws IllegalArgumentException if {@code external=true} is not set, 
location is missing,
+   *     non-identity partitions are specified, or distribution, sort orders, 
or indexes are
+   *     specified
+   */
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    Map<String, String> nonNullProps = properties == null ? 
Collections.emptyMap() : properties;
+
+    // Validate that the table is explicitly marked as external
+    Preconditions.checkArgument(
+        nonNullProps.containsKey(Table.PROPERTY_EXTERNAL)
+            && 
"true".equalsIgnoreCase(nonNullProps.get(Table.PROPERTY_EXTERNAL)),
+        "Gravitino only supports creating external Delta tables"
+            + " for now. Please set property 'external=true' when creating 
Delta tables.");
+
+    // Validate required location property
+    String location = nonNullProps.get(Table.PROPERTY_LOCATION);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(location),
+        "Property '%s' is required for external Delta tables. Please specify 
the"
+            + " Delta table location.",
+        Table.PROPERTY_LOCATION);
+
+    // Validate partition transforms - only identity transforms are allowed
+    Transform[] validatedPartitions = Transforms.EMPTY_TRANSFORM;
+    if (partitions != null && partitions.length > 0) {
+      for (Transform partition : partitions) {
+        Preconditions.checkArgument(
+            partition instanceof Transform.SingleFieldTransform
+                && "identity".equalsIgnoreCase(partition.name()),
+            "Delta table only supports identity partitioning. "
+                + "Non-identity transforms (bucket, truncate, year, month, 
etc.) are not supported. "
+                + "Invalid transform: %s",
+            partition.name());
+      }
+      validatedPartitions = partitions;
+    }
+
+    Preconditions.checkArgument(
+        distribution == null || distribution.equals(Distributions.NONE),
+        "Delta table doesn't support specifying distribution in CREATE TABLE. "
+            + "Distribution is not applicable for external Delta tables.");
+
+    Preconditions.checkArgument(
+        sortOrders == null || sortOrders.length == 0,
+        "Delta table doesn't support specifying sort orders in CREATE TABLE. "
+            + "Sort orders are not applicable for external Delta tables.");
+
+    Preconditions.checkArgument(
+        indexes == null || indexes.length == 0,
+        "Delta table doesn't support specifying indexes in CREATE TABLE. "
+            + "Indexes are not applicable for external Delta tables.");
+
+    // Store metadata in entity store (schema from user request)
+    return super.createTable(
+        ident,
+        columns,
+        comment,
+        nonNullProps,
+        validatedPartitions,
+        Distributions.NONE,
+        SortOrders.NONE,
+        Indexes.EMPTY_INDEXES);
+  }
+
+  /**
+   * Alters a Delta table.
+   *
+   * <p>This operation is not supported for external Delta tables. Users 
should modify the Delta
+   * table directly using Delta Lake APIs or tools. If the schema changes, the 
table entry in
+   * Gravitino can be dropped and recreated with the updated schema.
+   *
+   * @param ident the table identifier
+   * @param changes the table changes to apply
+   * @return never returns (always throws exception)
+   * @throws UnsupportedOperationException always thrown as ALTER is not 
supported
+   */
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    throw new UnsupportedOperationException(
+        "ALTER TABLE operations are not supported for external Delta tables. "
+            + "Please modify the Delta table directly using Delta Lake APIs or 
tools, "
+            + "then DROP and recreate the table entry in Gravitino with the 
updated schema if"
+            + " needed.");
+  }
+
+  /**
+   * Purges a Delta table (removes both metadata and data).
+   *
+   * <p>This operation is not supported for external Delta tables. External 
table data is managed
+   * outside Gravitino, so purging is not applicable. Use {@link 
#dropTable(NameIdentifier)} to
+   * remove only the metadata, leaving the Delta table data intact.
+   *
+   * @param ident the table identifier
+   * @return never returns (always throws exception)
+   * @throws UnsupportedOperationException always thrown as PURGE is not 
supported for external
+   *     tables
+   */
+  @Override
+  public boolean purgeTable(NameIdentifier ident) {
+    throw new UnsupportedOperationException(
+        "Purge operation is not supported for external Delta tables. "
+            + "External table data is managed outside of Gravitino. "
+            + "Use dropTable() to remove metadata only, preserving the Delta 
table data.");
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
index 14cd263ecd..ae605f6c56 100644
--- 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.catalog.lakehouse.generic;
 
+import static 
org.apache.gravitino.connector.PropertyEntry.booleanPropertyEntry;
 import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
 import static 
org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;
 
@@ -40,7 +41,9 @@ public class GenericTablePropertiesMetadata extends 
BasePropertiesMetadata {
         ImmutableList.of(
             stringOptionalPropertyEntry(
                 Table.PROPERTY_LOCATION,
-                "The root directory of the generic table.",
+                "The directory of the table. If this is not specified in the 
table"
+                    + " property, it will use the one in catalog / schema 
level and concatenate"
+                    + " with the table name. For external table, this property 
is required.",
                 false /* immutable */,
                 null, /* defaultValue */
                 false /* hidden */),
@@ -48,7 +51,15 @@ public class GenericTablePropertiesMetadata extends 
BasePropertiesMetadata {
                 Table.PROPERTY_TABLE_FORMAT,
                 "The format of the table",
                 true /* immutable */,
-                false /* hidden */));
+                false /* hidden */),
+            booleanPropertyEntry(
+                Table.PROPERTY_EXTERNAL,
+                "Whether the table is external or managed by the catalog.",
+                false /* required */,
+                true /* immutable */,
+                false /* defaultValue */,
+                false /* hidden */,
+                false /* reserved */));
 
     PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
   }
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
 
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
index 352813f4b1..cae6204b31 100644
--- 
a/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
@@ -16,4 +16,5 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+org.apache.gravitino.catalog.lakehouse.delta.DeltaTableDelegator
 org.apache.gravitino.catalog.lakehouse.lance.LanceTableDelegator
diff --git 
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/TestDeltaTableOperations.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/TestDeltaTableOperations.java
new file mode 100644
index 0000000000..438fe8ae5e
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/TestDeltaTableOperations.java
@@ -0,0 +1,302 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta;
+
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.IdGenerator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestDeltaTableOperations {
+
+  @TempDir private java.nio.file.Path tempDir;
+
+  private DeltaTableOperations deltaTableOps;
+  private EntityStore store;
+  private ManagedSchemaOperations schemaOps;
+  private IdGenerator idGenerator;
+
+  @BeforeEach
+  public void setUp() {
+    store = mock(EntityStore.class);
+    schemaOps = mock(ManagedSchemaOperations.class);
+    idGenerator = mock(IdGenerator.class);
+    deltaTableOps = new DeltaTableOperations(store, schemaOps, idGenerator);
+  }
+
+  @Test
+  public void testCreateTableValidationFailures() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    Column[] columns = new Column[] {Column.of("id", Types.IntegerType.get(), 
"id column")};
+    String location = tempDir.resolve("delta_table").toString();
+
+    // Test missing external property
+    Map<String, String> noExternal = Maps.newHashMap();
+    noExternal.put(Table.PROPERTY_LOCATION, location);
+    IllegalArgumentException e1 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, noExternal, new Transform[0], null, 
null, null));
+    Assertions.assertTrue(e1.getMessage().contains("external Delta tables"));
+    Assertions.assertTrue(e1.getMessage().contains("external=true"));
+
+    // Test external=false
+    Map<String, String> externalFalse = Maps.newHashMap();
+    externalFalse.put(Table.PROPERTY_LOCATION, location);
+    externalFalse.put(Table.PROPERTY_EXTERNAL, "false");
+    IllegalArgumentException e2 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, externalFalse, new Transform[0], 
null, null, null));
+    Assertions.assertTrue(e2.getMessage().contains("external Delta tables"));
+
+    // Test missing location
+    Map<String, String> noLocation = Maps.newHashMap();
+    noLocation.put(Table.PROPERTY_EXTERNAL, "true");
+    IllegalArgumentException e3 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, noLocation, new Transform[0], null, 
null, null));
+    Assertions.assertTrue(e3.getMessage().contains("location"));
+    Assertions.assertTrue(e3.getMessage().contains("required"));
+
+    // Test blank location
+    Map<String, String> blankLocation = Maps.newHashMap();
+    blankLocation.put(Table.PROPERTY_EXTERNAL, "true");
+    blankLocation.put(Table.PROPERTY_LOCATION, "  ");
+    IllegalArgumentException e4 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, blankLocation, new Transform[0], 
null, null, null));
+    Assertions.assertTrue(e4.getMessage().contains("location"));
+
+    // Test null properties
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            deltaTableOps.createTable(
+                ident, columns, null, null, new Transform[0], null, null, 
null));
+  }
+
+  @Test
+  public void testCreateTableWithIdentityPartitionsSucceeds() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    Column[] columns =
+        new Column[] {
+          Column.of("id", Types.IntegerType.get(), "id column"),
+          Column.of("region", Types.StringType.get(), "region column")
+        };
+    String location = tempDir.resolve("delta_table").toString();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+    properties.put(Table.PROPERTY_LOCATION, location);
+    StringIdentifier stringId = StringIdentifier.fromId(1L);
+    Map<String, String> newProperties = 
StringIdentifier.newPropertiesWithId(stringId, properties);
+
+    Transform[] partitions = new Transform[] {Transforms.identity("region")};
+
+    // Should NOT throw exception with identity partitions
+    // Note: We can't fully create the table with mocks, but validation should 
pass
+    Assertions.assertDoesNotThrow(
+        () -> {
+          deltaTableOps.createTable(
+              ident, columns, null, newProperties, partitions, null, null, 
null);
+        });
+  }
+
+  @Test
+  public void testCreateTableWithNonIdentityPartitionsThrowsException() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    Column[] columns =
+        new Column[] {
+          Column.of("id", Types.IntegerType.get(), "id column"),
+          Column.of("created_at", Types.DateType.get(), "created_at column")
+        };
+    String location = tempDir.resolve("delta_table").toString();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+    properties.put(Table.PROPERTY_LOCATION, location);
+
+    // Test bucket transform
+    Transform[] bucketPartitions = new Transform[] {Transforms.bucket(10, new 
String[] {"id"})};
+    IllegalArgumentException bucketException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, properties, bucketPartitions, null, 
null, null));
+    Assertions.assertTrue(bucketException.getMessage().contains("identity 
partitioning"));
+    Assertions.assertTrue(bucketException.getMessage().contains("bucket"));
+
+    // Test year transform
+    Transform[] yearPartitions = new Transform[] 
{Transforms.year("created_at")};
+    IllegalArgumentException yearException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, properties, yearPartitions, null, 
null, null));
+    Assertions.assertTrue(yearException.getMessage().contains("identity 
partitioning"));
+    Assertions.assertTrue(yearException.getMessage().contains("year"));
+
+    // Test truncate transform
+    Transform[] truncatePartitions = new Transform[] {Transforms.truncate(10, 
"id")};
+    IllegalArgumentException truncateException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, properties, truncatePartitions, 
null, null, null));
+    Assertions.assertTrue(truncateException.getMessage().contains("identity 
partitioning"));
+    Assertions.assertTrue(truncateException.getMessage().contains("truncate"));
+  }
+
+  @Test
+  public void testCreateTableWithDistributionThrowsException() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    Column[] columns = new Column[] {Column.of("id", Types.IntegerType.get(), 
"id column")};
+    String location = tempDir.resolve("delta_table").toString();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+    properties.put(Table.PROPERTY_LOCATION, location);
+
+    Distribution distribution = Distributions.hash(1, 
Transforms.identity("id"));
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, properties, null, distribution, 
null, null));
+
+    Assertions.assertTrue(exception.getMessage().contains("distribution"));
+    Assertions.assertTrue(exception.getMessage().contains("doesn't support"));
+    Assertions.assertTrue(exception.getMessage().contains("not applicable"));
+  }
+
+  @Test
+  public void testCreateTableWithSortOrdersThrowsException() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    Column[] columns = new Column[] {Column.of("id", Types.IntegerType.get(), 
"id column")};
+    String location = tempDir.resolve("delta_table").toString();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+    properties.put(Table.PROPERTY_LOCATION, location);
+
+    SortOrder[] sortOrders = new SortOrder[] 
{SortOrders.ascending(NamedReference.field("id"))};
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, properties, null, null, sortOrders, 
null));
+
+    Assertions.assertTrue(exception.getMessage().contains("sort orders"));
+    Assertions.assertTrue(exception.getMessage().contains("doesn't support"));
+    Assertions.assertTrue(exception.getMessage().contains("not applicable"));
+  }
+
+  @Test
+  public void testCreateTableWithIndexesThrowsException() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    Column[] columns = new Column[] {Column.of("id", Types.IntegerType.get(), 
"id column")};
+    String location = tempDir.resolve("delta_table").toString();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+    properties.put(Table.PROPERTY_LOCATION, location);
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{"id"}})};
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                deltaTableOps.createTable(
+                    ident, columns, null, properties, null, null, null, 
indexes));
+
+    Assertions.assertTrue(exception.getMessage().contains("indexes"));
+    Assertions.assertTrue(exception.getMessage().contains("doesn't support"));
+    Assertions.assertTrue(exception.getMessage().contains("not applicable"));
+  }
+
+  @Test
+  public void testAlterTableThrowsException() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+    TableChange[] changes =
+        new TableChange[] {TableChange.addColumn(new String[] {"new_col"}, 
Types.StringType.get())};
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class, () -> 
deltaTableOps.alterTable(ident, changes));
+
+    Assertions.assertTrue(exception.getMessage().contains("ALTER TABLE"));
+    Assertions.assertTrue(exception.getMessage().contains("not supported"));
+    Assertions.assertTrue(exception.getMessage().contains("Delta Lake APIs"));
+  }
+
+  @Test
+  public void testPurgeTableThrowsException() {
+    NameIdentifier ident = NameIdentifier.of("catalog", "schema", "table");
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class, () -> 
deltaTableOps.purgeTable(ident));
+
+    Assertions.assertTrue(exception.getMessage().contains("Purge"));
+    Assertions.assertTrue(exception.getMessage().contains("not supported"));
+    Assertions.assertTrue(exception.getMessage().contains("external"));
+    Assertions.assertTrue(exception.getMessage().contains("dropTable()"));
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/integration/test/CatalogGenericCatalogDeltaIT.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/integration/test/CatalogGenericCatalogDeltaIT.java
new file mode 100644
index 0000000000..8dffe0c32b
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/delta/integration/test/CatalogGenericCatalogDeltaIT.java
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.delta.integration.test;
+
+import com.google.common.collect.Maps;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.delta.DeltaConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for Delta table support in Gravitino generic lakehouse 
catalog.
+ *
+ * <p>These tests verify:
+ *
+ * <ul>
+ *   <li>Creating a physical Delta table using Delta Kernel
+ *   <li>Registering the Delta table in Gravitino catalog
+ *   <li>Loading table metadata from Gravitino
+ *   <li>Reading actual Delta table using location from Gravitino metadata
+ *   <li>Verifying table still exists after dropping from Gravitino 
(metadata-only drop)
+ * </ul>
+ */
+public class CatalogGenericCatalogDeltaIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogGenericCatalogDeltaIT.class);
+  public static final String METALAKE_NAME =
+      GravitinoITUtils.genRandomName("CatalogGenericDeltaIT_metalake");
+
+  public String catalogName = 
GravitinoITUtils.genRandomName("CatalogGenericDeltaIT_catalog");
+  public String SCHEMA_PREFIX = "CatalogGenericDelta_schema";
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  public String TABLE_PREFIX = "CatalogGenericDelta_table";
+  public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+  public static final String TABLE_COMMENT = "Delta table comment";
+  public static final String COL_NAME1 = "id";
+  public static final String COL_NAME2 = "name";
+  protected final String provider = "lakehouse-generic";
+  protected GravitinoMetalake metalake;
+  protected Catalog catalog;
+  protected String tempDirectory;
+  protected Engine deltaEngine;
+
+  @BeforeAll
+  public void startup() throws Exception {
+    createMetalake();
+    createCatalog();
+    createSchema();
+
+    Path tempDir = Files.createTempDirectory("deltaTempDir");
+    tempDirectory = tempDir.toString();
+
+    deltaEngine = DefaultEngine.create(new Configuration());
+  }
+
+  @AfterAll
+  public void stop() throws IOException {
+    if (client != null) {
+      Arrays.stream(catalog.asSchemas().listSchemas())
+          .filter(schema -> !schema.equals("default"))
+          .forEach(
+              (schema -> {
+                catalog.asSchemas().dropSchema(schema, true);
+              }));
+      Arrays.stream(metalake.listCatalogs())
+          .forEach(
+              catalogName -> {
+                metalake.dropCatalog(catalogName, true);
+              });
+      client.dropMetalake(METALAKE_NAME, true);
+    }
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close CloseableGroup", e);
+    }
+
+    client = null;
+
+    FileUtils.deleteDirectory(new File(tempDirectory));
+  }
+
+  @AfterEach
+  public void resetSchema() throws InterruptedException {
+    catalog.asSchemas().dropSchema(schemaName, true);
+    createSchema();
+  }
+
+  @Test
+  public void testCreateDeltaTableAndRegisterToGravitino() throws Exception {
+    String tableLocation = tempDirectory + "/" + tableName;
+
+    // Step 1: Create a physical Delta table using Delta Kernel
+    StructType schema =
+        new StructType().add("id", IntegerType.INTEGER, true).add("name", 
StringType.STRING, true);
+
+    TransactionBuilder txnBuilder =
+        io.delta.kernel.Table.forPath(deltaEngine, tableLocation)
+            .createTransactionBuilder(deltaEngine, "test", 
Operation.CREATE_TABLE);
+
+    txnBuilder
+        .withSchema(deltaEngine, schema)
+        .withPartitionColumns(deltaEngine, Collections.emptyList())
+        .build(deltaEngine)
+        .commit(deltaEngine, emptyRowIterable());
+
+    LOG.info("Created Delta table at: {}", tableLocation);
+
+    // Step 2: Register the Delta table in Gravitino catalog
+    Column[] gravitinoColumns =
+        new Column[] {
+          Column.of(COL_NAME1, Types.IntegerType.get(), "id column"),
+          Column.of(COL_NAME2, Types.StringType.get(), "name column")
+        };
+
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Table gravitinoTable =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier,
+                gravitinoColumns,
+                TABLE_COMMENT,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+
+    Assertions.assertEquals(tableName, gravitinoTable.name());
+    Assertions.assertEquals(TABLE_COMMENT, gravitinoTable.comment());
+    LOG.info("Registered Delta table in Gravitino catalog");
+
+    // Step 3: Load table metadata from Gravitino
+    Table loadedTable = catalog.asTableCatalog().loadTable(nameIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+    Assertions.assertEquals(2, loadedTable.columns().length);
+
+    // Note: Gravitino may normalize the location by adding trailing slash
+    String locationFromMetadata = 
loadedTable.properties().get(Table.PROPERTY_LOCATION);
+    Assertions.assertTrue(
+        locationFromMetadata.equals(tableLocation)
+            || locationFromMetadata.equals(tableLocation + "/"),
+        "Location should match with or without trailing slash");
+
+    // Step 4: Use the location from Gravitino metadata to read actual Delta 
table
+    Assertions.assertNotNull(locationFromMetadata);
+
+    // Read Delta table using Delta Kernel
+    io.delta.kernel.Table deltaTable =
+        io.delta.kernel.Table.forPath(deltaEngine, locationFromMetadata);
+    Snapshot snapshot = deltaTable.getLatestSnapshot(deltaEngine);
+    Assertions.assertNotNull(snapshot);
+
+    StructType deltaSchema = snapshot.getSchema(deltaEngine);
+    Assertions.assertEquals(2, deltaSchema.fields().size());
+    Assertions.assertEquals(COL_NAME1, deltaSchema.fields().get(0).getName());
+    Assertions.assertEquals(COL_NAME2, deltaSchema.fields().get(1).getName());
+
+    // Step 5: Drop table from Gravitino catalog (metadata only)
+    boolean dropped = catalog.asTableCatalog().dropTable(nameIdentifier);
+    Assertions.assertTrue(dropped);
+
+    // Step 6: Verify Delta table still exists at location and can be accessed
+    io.delta.kernel.Table deltaTableAfterDrop =
+        io.delta.kernel.Table.forPath(deltaEngine, locationFromMetadata);
+    Snapshot snapshotAfterDrop = 
deltaTableAfterDrop.getLatestSnapshot(deltaEngine);
+    Assertions.assertNotNull(snapshotAfterDrop);
+    Assertions.assertEquals(2, 
snapshotAfterDrop.getSchema(deltaEngine).fields().size());
+  }
+
+  @Test
+  public void testCreateDeltaTableWithoutExternalFails() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+
+    Exception exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        Transforms.EMPTY_TRANSFORM,
+                        null,
+                        null));
+
+    Assertions.assertTrue(exception.getMessage().contains("external Delta 
tables"));
+  }
+
+  @Test
+  public void testCreateDeltaTableWithoutLocationFails() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Exception exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        Transforms.EMPTY_TRANSFORM,
+                        null,
+                        null));
+
+    Assertions.assertTrue(exception.getMessage().contains("location"));
+  }
+
+  @Test
+  public void testAlterDeltaTableFails() throws Exception {
+    String tableLocation = tempDirectory + "/" + tableName + "_alter";
+
+    // Create physical Delta table
+    StructType schema =
+        new StructType().add("id", IntegerType.INTEGER, true).add("name", 
StringType.STRING, true);
+
+    TransactionBuilder txnBuilder =
+        io.delta.kernel.Table.forPath(deltaEngine, tableLocation)
+            .createTransactionBuilder(deltaEngine, "test", 
Operation.CREATE_TABLE);
+
+    txnBuilder
+        .withSchema(deltaEngine, schema)
+        .withPartitionColumns(deltaEngine, Collections.emptyList())
+        .build(deltaEngine)
+        .commit(deltaEngine, emptyRowIterable());
+
+    // Register in Gravitino
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            nameIdentifier,
+            columns,
+            TABLE_COMMENT,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            null,
+            null);
+
+    TableChange addColumn = TableChange.addColumn(new String[] {"new_col"}, 
Types.StringType.get());
+
+    Exception exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () -> catalog.asTableCatalog().alterTable(nameIdentifier, 
addColumn));
+
+    Assertions.assertTrue(exception.getMessage().contains("ALTER TABLE"));
+    Assertions.assertTrue(exception.getMessage().contains("not supported"));
+  }
+
+  @Test
+  public void testPurgeDeltaTableFails() throws Exception {
+    String tableLocation = tempDirectory + "/" + tableName + "_purge";
+
+    // Create physical Delta table
+    StructType schema =
+        new StructType().add("id", IntegerType.INTEGER, true).add("name", 
StringType.STRING, true);
+
+    TransactionBuilder txnBuilder =
+        io.delta.kernel.Table.forPath(deltaEngine, tableLocation)
+            .createTransactionBuilder(deltaEngine, "test", 
Operation.CREATE_TABLE);
+
+    txnBuilder
+        .withSchema(deltaEngine, schema)
+        .withPartitionColumns(deltaEngine, Collections.emptyList())
+        .build(deltaEngine)
+        .commit(deltaEngine, emptyRowIterable());
+
+    // Register in Gravitino
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            nameIdentifier,
+            columns,
+            TABLE_COMMENT,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            null,
+            null);
+
+    Exception exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () -> catalog.asTableCatalog().purgeTable(nameIdentifier));
+
+    Assertions.assertTrue(exception.getMessage().contains("Purge"));
+    Assertions.assertTrue(exception.getMessage().contains("not supported"));
+  }
+
+  @Test
+  public void testCreateDeltaTableWithIdentityPartitionsSucceeds() throws 
Exception {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Transform[] partitions = new Transform[] 
{Transforms.identity("created_at")};
+
+    // Create Delta table with identity partitions should succeed
+    Table table =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier, columns, TABLE_COMMENT, properties, 
partitions, null, null, null);
+
+    Assertions.assertNotNull(table);
+    Assertions.assertEquals(1, table.partitioning().length);
+    Assertions.assertEquals("identity", table.partitioning()[0].name());
+
+    // Verify partition metadata is persisted
+    Table loadedTable = catalog.asTableCatalog().loadTable(nameIdentifier);
+    Assertions.assertEquals(1, loadedTable.partitioning().length);
+    Assertions.assertEquals("identity", loadedTable.partitioning()[0].name());
+
+    // Cleanup
+    catalog.asTableCatalog().dropTable(nameIdentifier);
+  }
+
+  @Test
+  public void testCreateDeltaTableWithNonIdentityPartitionsThrowsException() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    // Test bucket transform
+    Transform[] bucketPartitions =
+        new Transform[] {Transforms.bucket(10, new String[] {COL_NAME1})};
+    IllegalArgumentException bucketException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        bucketPartitions,
+                        null,
+                        null,
+                        null));
+
+    Assertions.assertTrue(bucketException.getMessage().contains("identity 
partitioning"));
+    Assertions.assertTrue(bucketException.getMessage().contains("bucket"));
+  }
+
+  @Test
+  public void testCreateDeltaTableWithDistributionThrowsException() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        null,
+                        Distributions.hash(5, NamedReference.field(COL_NAME1)),
+                        null,
+                        null));
+
+    Assertions.assertTrue(exception.getMessage().contains("distribution"));
+    Assertions.assertTrue(exception.getMessage().contains("doesn't support"));
+  }
+
+  @Test
+  public void testCreateDeltaTableWithSortOrdersThrowsException() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        null,
+                        null,
+                        new SortOrder[] 
{SortOrders.ascending(NamedReference.field(COL_NAME1))},
+                        null));
+
+    Assertions.assertTrue(exception.getMessage().contains("sort orders"));
+    Assertions.assertTrue(exception.getMessage().contains("doesn't support"));
+  }
+
+  @Test
+  public void testCreateDeltaTableWithIndexesThrowsException() {
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createTableProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put(Table.PROPERTY_TABLE_FORMAT, 
DeltaConstants.DELTA_TABLE_FORMAT);
+    properties.put(Table.PROPERTY_LOCATION, tableLocation);
+    properties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Index[] indexes = new Index[] {Indexes.primary("pk_id", new String[][] 
{{COL_NAME1}})};
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        nameIdentifier,
+                        columns,
+                        TABLE_COMMENT,
+                        properties,
+                        null,
+                        null,
+                        null,
+                        indexes));
+
+    Assertions.assertTrue(exception.getMessage().contains("indexes"));
+    Assertions.assertTrue(exception.getMessage().contains("doesn't support"));
+  }
+
+  protected Map<String, String> createSchemaProperties() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    return properties;
+  }
+
+  private void createMetalake() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(METALAKE_NAME, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(METALAKE_NAME);
+    Assertions.assertEquals(METALAKE_NAME, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  protected void createCatalog() {
+    Map<String, String> properties = Maps.newHashMap();
+    metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, provider, 
"comment", properties);
+
+    catalog = metalake.loadCatalog(catalogName);
+  }
+
+  private void createSchema() throws InterruptedException {
+    Map<String, String> schemaProperties = createSchemaProperties();
+    String comment = "schema comment";
+    catalog.asSchemas().createSchema(schemaName, comment, schemaProperties);
+    Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+    Assertions.assertEquals(schemaName, loadSchema.name());
+    Assertions.assertEquals(comment, loadSchema.comment());
+    Assertions.assertEquals("val1", loadSchema.properties().get("key1"));
+    Assertions.assertEquals("val2", loadSchema.properties().get("key2"));
+  }
+
+  private Column[] createColumns() {
+    Column col1 = Column.of(COL_NAME1, Types.IntegerType.get(), "id column");
+    Column col2 = Column.of(COL_NAME2, Types.StringType.get(), "name column");
+    Column col3 = Column.of("created_at", Types.DateType.get(), "created_at 
column");
+    return new Column[] {col1, col2, col3};
+  }
+
+  protected Map<String, String> createTableProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    return properties;
+  }
+
+  /**
+   * Helper method to create an empty {@code CloseableIterable<Row>} for Delta 
Kernel transaction
+   * commits.
+   */
+  private static CloseableIterable<Row> emptyRowIterable() {
+    return new CloseableIterable<Row>() {
+
+      @Override
+      public CloseableIterator<Row> iterator() {
+        return new CloseableIterator<Row>() {
+          @Override
+          public void close() throws IOException {
+            // No resources to close
+          }
+
+          @Override
+          public boolean hasNext() {
+            return false;
+          }
+
+          @Override
+          public Row next() {
+            throw new NoSuchElementException("Empty iterator");
+          }
+        };
+      }
+
+      @Override
+      public void close() throws IOException {
+        // No resources to close
+      }
+    };
+  }
+}
diff --git a/docs/lakehouse-generic-delta-table.md 
b/docs/lakehouse-generic-delta-table.md
new file mode 100644
index 0000000000..e983f865e3
--- /dev/null
+++ b/docs/lakehouse-generic-delta-table.md
@@ -0,0 +1,390 @@
+---
+title: "Delta Lake table support"
+slug: /delta-table-support
+keywords:
+- lakehouse
+- delta
+- delta lake
+- metadata
+- generic catalog
+license: "This software is licensed under the Apache License version 2."
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+## Overview
+
+This document describes how to use Apache Gravitino to manage a generic 
lakehouse catalog using Delta Lake as the underlying table format. Gravitino 
supports registering and managing metadata for external Delta tables.
+
+:::info Current Support
+Gravitino currently supports **external Delta tables only**. This means:
+- You can register existing Delta tables in Gravitino
+- Gravitino manages metadata only (schema, location, properties)
+- The physical Delta table data remains independent
+- Dropping tables from Gravitino does not delete the underlying Delta data
+:::
+
+## Table Management
+
+### Supported Operations
+
+For Delta tables in a Generic Lakehouse Catalog, the following table 
summarizes supported operations:
+
+| Operation | Support Status                                 |
+|-----------|------------------------------------------------|
+| List      | ✅ Full                                         |
+| Load      | ✅ Full                                         |
+| Alter     | ❌ Not supported (use Delta Lake APIs directly) |
+| Create    | ✅ Register external tables only                |
+| Drop      | ✅ Metadata only (data preserved)               |
+| Purge     | ❌ Not supported for external tables            |
+
+:::note Feature Limitations
+- **External Tables Only:** Must set `external=true` when creating Delta tables
+- **Alter Operations:** Not supported; modify tables using Delta Lake APIs or 
Spark, then update Gravitino metadata if needed
+- **Purge:** Not applicable for external tables; use DROP to remove metadata 
only
+- **Partitioning:** Identity partitions supported as metadata only; user must 
ensure consistency with actual Delta table
+- **Sort Orders:** Not supported in CREATE TABLE
+- **Distributions:** Not supported in CREATE TABLE
+- **Indexes:** Not supported in CREATE TABLE
+:::
+
+### Data Type Mappings
+
+Delta Lake uses Apache Spark data types. The following table shows type 
mappings between Gravitino and Delta/Spark:
+
+| Gravitino Type      | Delta/Spark Type       | Notes                         
  |
+|---------------------|------------------------|---------------------------------|
+| `Boolean`           | `BooleanType`          |                               
  |
+| `Byte`              | `ByteType`             |                               
  |
+| `Short`             | `ShortType`            |                               
  |
+| `Integer`           | `IntegerType`          |                               
  |
+| `Long`              | `LongType`             |                               
  |
+| `Float`             | `FloatType`            |                               
  |
+| `Double`            | `DoubleType`           |                               
  |
+| `Decimal(p, s)`     | `DecimalType(p, s)`    |                               
  |
+| `String`            | `StringType`           |                               
  |
+| `Binary`            | `BinaryType`           |                               
  |
+| `Date`              | `DateType`             |                               
  |
+| `Timestamp`         | `TimestampNTZType`     | No timezone, Spark 3.4+       
  |
+| `Timestamp_tz`      | `TimestampType`        | With timezone                 
  |
+| `List`              | `ArrayType`            |                               
  |
+| `Map`               | `MapType`              |                               
  |
+| `Struct`            | `StructType`           |                               
  |
+
+### Table Properties
+
+Required and optional properties for Delta tables in a Generic Lakehouse 
Catalog:
+
+| Property   | Description                                                     
                                                                                
                                                                       | 
Default | Required | Since Version |
+|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|----------|---------------|
+| `format`   | Table format: must be `delta`                                   
                                                                                
                                                                       | (none) 
 | Yes      | 1.2.0         |
+| `location` | Storage path for the Delta table. Must point to a directory 
containing Delta Lake metadata (_delta_log). Supports file://, s3://, hdfs://, 
abfs://, gs://, and other Hadoop-compatible file systems.                  | 
(none)  | Yes      | 1.2.0         |
+| `external` | Must be `true` for Delta tables. Indicates that Gravitino 
manages metadata only <br/>and will not delete physical data when the table is 
dropped.                                                                        
   | (none)  | Yes      | 1.2.0         |
+
+**Location Requirement:** Must be specified at table level for external Delta 
table. See [Location 
Resolution](./lakehouse-generic-catalog.md#key-property-location).
+
+### Table Operations
+
+Table operations follow standard relational catalog patterns with 
Delta-specific considerations. See [Table 
Operations](./manage-relational-metadata-using-gravitino.md#table-operations) 
for comprehensive documentation.
+
+The following sections provide examples and important details for working with 
Delta tables.
+
+#### Registering an External Delta Table
+
+Register an existing Delta table in Gravitino without moving or modifying the 
underlying data:
+
+<Tabs groupId='language' queryString>
+<TabItem value="shell" label="Shell">
+
+```shell
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+  -H "Content-Type: application/json" -d '{
+  "name": "customer_orders",
+  "comment": "Customer orders Delta table",
+  "columns": [
+    {
+      "name": "order_id",
+      "type": "long",
+      "comment": "Order identifier",
+      "nullable": false
+    },
+    {
+      "name": "customer_id",
+      "type": "long",
+      "comment": "Customer identifier",
+      "nullable": false
+    },
+    {
+      "name": "order_date",
+      "type": "date",
+      "comment": "Order date",
+      "nullable": false
+    },
+    {
+      "name": "total_amount",
+      "type": "decimal(10,2)",
+      "comment": "Total order amount",
+      "nullable": true
+    }
+  ],
+  "properties": {
+    "format": "delta",
+    "external": "true",
+    "location": "s3://my-bucket/delta-tables/customer_orders"
+  }
+}' 
http://localhost:8090/api/metalakes/test/catalogs/generic_lakehouse_delta_catalog/schemas/sales/tables
+```
+
+</TabItem>
+<TabItem value="java" label="Java">
+
+```java
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.types.Types;
+import com.google.common.collect.ImmutableMap;
+
+Catalog catalog = 
gravitinoClient.loadCatalog("generic_lakehouse_delta_catalog");
+TableCatalog tableCatalog = catalog.asTableCatalog();
+
+Map<String, String> tableProperties = ImmutableMap.<String, String>builder()
+    .put("format", "delta")
+    .put("external", "true")
+    .put("location", "s3://my-bucket/delta-tables/customer_orders")
+    .build();
+
+tableCatalog.createTable(
+    NameIdentifier.of("sales", "customer_orders"),
+    new Column[] {
+        Column.of("order_id", Types.LongType.get(), "Order identifier", false, 
false, null),
+        Column.of("customer_id", Types.LongType.get(), "Customer identifier", 
false, false, null),
+        Column.of("order_date", Types.DateType.get(), "Order date", false, 
false, null),
+        Column.of("total_amount", Types.DecimalType.of(10, 2), "Total order 
amount", true, false, null)
+    },
+    "Customer orders Delta table",
+    tableProperties,
+    null,  // partitions (optional, identity only)
+    null,  // distributions (not supported)
+    null,  // sortOrders (not supported)
+    null   // indexes (not supported)
+);
+```
+
+</TabItem>
+</Tabs>
+
+:::important Schema Specification
+When registering a Delta table in Gravitino, you must provide the schema 
(columns) in the CREATE TABLE request. Gravitino stores this schema as metadata 
but does not validate it against the Delta table's actual schema. 
+
+**Best Practice:** Ensure the schema you provide matches the actual Delta 
table schema to avoid inconsistencies.
+:::
+
+#### Loading a Delta Table
+
+<Tabs groupId='language' queryString>
+<TabItem value="shell" label="Shell">
+
+```shell
+curl -X GET -H "Accept: application/vnd.gravitino.v1+json" \
+  
http://localhost:8090/api/metalakes/test/catalogs/generic_lakehouse_delta_catalog/schemas/sales/tables/customer_orders
+```
+
+</TabItem>
+<TabItem value="java" label="Java">
+
+```java
+Table table = tableCatalog.loadTable(
+    NameIdentifier.of("sales", "customer_orders")
+);
+
+System.out.println("Table location: " + table.properties().get("location"));
+System.out.println("Columns: " + Arrays.toString(table.columns()));
+```
+
+</TabItem>
+</Tabs>
+
+#### Dropping a Delta Table
+
+Dropping a Delta table from Gravitino removes only the metadata entry. The 
physical Delta table data remains intact.
+
+<Tabs groupId='language' queryString>
+<TabItem value="shell" label="Shell">
+
+```shell
+curl -X DELETE -H "Accept: application/vnd.gravitino.v1+json" \
+  
http://localhost:8090/api/metalakes/test/catalogs/generic_lakehouse_delta_catalog/schemas/sales/tables/customer_orders
+```
+
+</TabItem>
+<TabItem value="java" label="Java">
+
+```java
+boolean dropped = tableCatalog.dropTable(
+    NameIdentifier.of("sales", "customer_orders")
+);
+// The Delta table files at the location are NOT deleted
+```
+
+</TabItem>
+</Tabs>
+
+:::tip Metadata-Only Drop
+Since Delta tables are external, dropping them from Gravitino:
+- ✅ Removes the table from Gravitino's metadata
+- ✅ Preserves the Delta table data at its location
+- ✅ Allows you to re-register the same table later
+
+The Delta table can still be accessed directly via Delta Lake APIs, Spark, or 
other tools.
+:::
+
+## Working with Delta Tables
+
+### Using Spark to Modify Delta Tables
+
+Since Gravitino does not support ALTER operations for Delta tables, use Apache 
Spark or other Delta Lake tools to modify table structure:
+
+```java
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import io.delta.tables.DeltaTable;
+import static org.apache.spark.sql.functions.lit;
+
+// Create Spark session with Delta Lake support
+SparkSession spark = SparkSession.builder()
+    .appName("Delta Table Modification")
+    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
+    .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
+    .getOrCreate();
+
+// Read the table location from Gravitino
+String tableLocation = "s3://my-bucket/delta-tables/customer_orders";
+
+// Add a new column using Delta Lake
+DeltaTable deltaTable = DeltaTable.forPath(spark, tableLocation);
+Dataset<Row> df = deltaTable.toDF()
+    .withColumn("status", lit("pending"));
+
+df.write()
+    .format("delta")
+    .mode("overwrite")
+    .option("overwriteSchema", "true")
+    .save(tableLocation);
+```
+
+After modifying the Delta table, you can:
+1. Drop the table from Gravitino
+2. Re-register it with the updated schema
+
+### Reading Delta Tables via Gravitino
+
+Once registered in Gravitino, you can query Delta table metadata and use the 
location to read data:
+
+```java
+// Load table metadata from Gravitino
+Table table = tableCatalog.loadTable(NameIdentifier.of("sales", 
"customer_orders"));
+String location = table.properties().get("location");
+
+// Use the location to read the Delta table with Spark
+Dataset<Row> df = spark.read()
+    .format("delta")
+    .load(location);
+
+df.show();
+```
+
+### Partitioned Delta Tables
+
+Delta Lake supports partitioning, and Gravitino can store identity partition 
metadata for external Delta tables. The partition information is metadata-only 
and must match the actual Delta table's partitioning scheme defined in the 
Delta transaction log.
+
+```java
+// Register a partitioned Delta table
+Map<String, String> properties = ImmutableMap.<String, String>builder()
+    .put("format", "delta")
+    .put("external", "true")
+    .put("location", "s3://my-bucket/delta-tables/sales_partitioned")
+    .build();
+
+// Specify identity partitions (metadata only)
+Transform[] partitions = new Transform[] {
+    Transforms.identity("year"),
+    Transforms.identity("month")
+};
+
+tableCatalog.createTable(
+    NameIdentifier.of("sales", "sales_partitioned"),
+    columns,
+    "Partitioned sales data",
+    properties,
+    partitions,  // Identity partitions supported
+    null,
+    null,
+    null);
+```
+
+:::note
+Partition information in Gravitino is **metadata only**:
+- Only **identity transforms** are supported (e.g., 
`Transforms.identity("column")`)
+- Non-identity transforms (bucket, truncate, year, month, etc.) will be 
rejected
+- The actual partitioning is managed by Delta Lake in the _delta_log
+- **User responsibility**: Ensure the partition metadata you provide matches 
the actual Delta table's partitioning
+- Gravitino does not validate partition metadata against the Delta transaction 
log
+:::
+
+## Advanced Topics
+
+### Troubleshooting
+
+#### Common Issues
+
+**Issue: "Gravitino only supports creating external Delta tables"**
+```
+Solution: Ensure you set "external": "true" in the table properties
+```
+
+**Issue: "Property 'location' is required for external Delta tables"**
+```
+Solution: Specify the location property pointing to your Delta table directory
+```
+
+**Issue: "ALTER TABLE operations are not supported"**
+```
+Solution: Use Delta Lake APIs (Spark, Delta-rs, etc.) to modify the table,
+then optionally drop and re-register in Gravitino with updated schema
+```
+
+**Issue: "Purge operation is not supported for external Delta tables"**
+```
+Solution: Use dropTable() to remove metadata only. To delete data, 
+manually remove files from the storage location
+```
+
+## Limitations and Future Work
+
+### Current Limitations
+
+- **Managed Tables**: Not supported; only external tables are available
+- **ALTER Operations**: Cannot modify table schema through Gravitino; use 
Delta Lake APIs
+- **Partitioning**: Only identity partitions supported; stored as metadata 
only (not validated against Delta log)
+- **Indexes**: Not supported in CREATE TABLE
+- **Time Travel**: Access via Delta Lake APIs directly; not exposed through 
Gravitino
+
+### Planned Enhancements
+
+Future versions may include:
+- Support for managed Delta tables (requires Delta Lake 4.0+ CommitCoordinator)
+- Schema evolution tracking
+- Integration with Delta Lake time travel features
+- Enhanced metadata synchronization
+
+## See Also
+
+- [Generic Lakehouse Catalog](./lakehouse-generic-catalog.md)
+- [Table 
Operations](./manage-relational-metadata-using-gravitino.md#table-operations)
+- [Delta Lake Documentation](https://docs.delta.io/)
+- [Delta Lake GitHub](https://github.com/delta-io/delta)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d50ffcfddf..43cfd73d8d 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -30,6 +30,7 @@ slf4j = "2.0.16"
 log4j = "2.24.3"
 lance = "0.39.0"
 lance-namespace = "0.0.20"
+delta-kernel = "3.3.0"
 jetty = "9.4.51.v20230217"
 jersey = "2.41"
 mockito = "4.11.0"
@@ -171,6 +172,8 @@ log4j-12-api = { group = "org.apache.logging.log4j", name = 
"log4j-1.2-api", ver
 log4j-layout-template-json = { group = "org.apache.logging.log4j", name = 
"log4j-layout-template-json", version.ref = "log4j" }
 lance = { group = "com.lancedb", name = "lance-core", version.ref = "lance" }
 lance-namespace-core = { group = "com.lancedb", name = "lance-namespace-core", 
version.ref = "lance-namespace" }
+delta-kernel = { group = "io.delta", name = "delta-kernel-api", version.ref = 
"delta-kernel" }
+delta-kernel-defaults = { group = "io.delta", name = "delta-kernel-defaults", 
version.ref = "delta-kernel" }
 jakarta-validation-api = { group = "jakarta.validation", name = 
"jakarta.validation-api", version.ref = "jakarta-validation" }
 jetty-server = { group = "org.eclipse.jetty", name = "jetty-server", 
version.ref = "jetty" }
 jetty-servlet = { group = "org.eclipse.jetty", name = "jetty-servlet", 
version.ref = "jetty" }

Reply via email to