This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by 
this push:
     new f313b70f9c [#8915] improvment(catalogs): Polish code for PR #8879 
(#8922)
f313b70f9c is described below

commit f313b70f9c59969a7d95d99b72a02d23c17bee05
Author: Mini Yu <[email protected]>
AuthorDate: Thu Oct 30 15:01:12 2025 +0800

    [#8915] improvment(catalogs): Polish code for PR #8879 (#8922)
    
    ### What changes were proposed in this pull request?
    
    This PR trys to resolve the comments that have not been addressed in
    #8879
    
    ### Why are the changes needed?
    
    It's an improvement.
    
    Fix: #8915
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    Test locally, and we will add ITs in
    https://github.com/apache/gravitino/issues/8921
---
 .../main/java/org/apache/gravitino/rel/Table.java  |  31 ++++
 .../catalog-generic-lakehouse/build.gradle.kts     |   1 -
 .../GenericLakehouseCatalogOperations.java         | 197 ++++++++++++++-------
 .../GenericLakehouseCatalogPropertiesMetadata.java |   4 +-
 .../GenericLakehouseSchemaPropertiesMetadata.java  |   6 +-
 .../GenericLakehouseTablePropertiesMetadata.java   |  19 +-
 .../lakehouse/LakehouseCatalogOperations.java      |  14 ++
 .../catalog/lakehouse/LakehouseTableFormat.java    |  38 ++--
 .../lakehouse/lance/LanceCatalogOperations.java    | 126 ++++++++-----
 .../catalog/lakehouse/utils/EntityConverter.java   |  41 ++---
 .../catalog/lakehouse/TestPropertiesMetadata.java  | 102 +++++++++++
 .../lakehouse/utils/TestEntityConverter.java       |  78 ++++++++
 .../gravitino/catalog/EntityCombinedTable.java     |  10 ++
 .../catalog/TableOperationDispatcher.java          | 181 ++++++++-----------
 .../connector/GenericLakehouseColumn.java          |   1 +
 .../gravitino/connector/GenericLakehouseTable.java |  14 +-
 .../apache/gravitino/meta/GenericTableEntity.java  | 186 -------------------
 .../org/apache/gravitino/meta/TableEntity.java     | 112 +++++++++++-
 .../storage/relational/mapper/TableMetaMapper.java |   3 -
 .../relational/mapper/TableVersionMapper.java      |  14 ++
 .../mapper/TableVersionSQLProviderFactory.java     |  10 ++
 .../provider/base/TableMetaBaseSQLProvider.java    |  43 +++--
 .../provider/base/TableVersionBaseSQLProvider.java |  16 ++
 .../postgresql/TableVersionPostgreSQLProvider.java |  11 +-
 .../relational/service/TableMetaService.java       |  46 +++--
 .../storage/relational/utils/POConverters.java     | 119 ++++---------
 .../gravitino/catalog/TestOperationDispatcher.java |   1 +
 .../gravitino/hook/TestTableHookDispatcher.java    |   7 +
 .../java/org/apache/gravitino/meta/TestEntity.java |  35 +++-
 .../gravitino/storage/TestEntityStorage.java       |  27 +--
 .../storage/relational/TestJDBCBackend.java        | 123 +++++++++++++
 .../storage/relational/utils/TestPOConverters.java |   3 +-
 .../service/rest/LanceNamespaceOperations.java     |   2 +
 .../lance/service/rest/LanceTableOperations.java   |   9 +
 scripts/h2/schema-1.1.0-h2.sql                     |   5 +-
 scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql           |   5 +-
 scripts/mysql/schema-1.1.0-mysql.sql               |   5 +-
 scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql     |   5 +-
 scripts/postgresql/schema-1.1.0-postgresql.sql     |   8 +-
 .../upgrade-1.0.0-to-1.1.0-postgresql.sql          |   8 +-
 40 files changed, 1038 insertions(+), 628 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/rel/Table.java 
b/api/src/main/java/org/apache/gravitino/rel/Table.java
index 619694b1a2..316ba18839 100644
--- a/api/src/main/java/org/apache/gravitino/rel/Table.java
+++ b/api/src/main/java/org/apache/gravitino/rel/Table.java
@@ -99,6 +99,37 @@ public interface Table extends Auditable {
     return Collections.emptyMap();
   }
 
+  /**
+   * Table format of the table. For example, in a file-based table, it could 
be "parquet", "Lance",
+   * "Iceberg", etc.
+   *
+   * @return the table format name, for more information: LakehouseTableFormat
+   */
+  default String format() {
+    throw new UnsupportedOperationException("Table format is not supported.");
+  }
+
+  /**
+   * Gets the location of the table if the table has a location. For example, 
in a file-based table,
+   * it could be the root path where the table data is stored.
+   *
+   * @return the location of the table as a string.
+   */
+  default String location() {
+    throw new UnsupportedOperationException("Table location is not 
supported.");
+  }
+
+  /**
+   * Indicates whether the table is external. An external table is a table 
that is not managed by
+   * the catalog and the drop operation will not delete the underlying data. 
If it's a managed
+   * table, dropping the table will delete the underlying data.
+   *
+   * @return true if the table is external, false otherwise
+   */
+  default boolean external() {
+    return false;
+  }
+
   /**
    * Table method for working with partitions. If the table does not support 
partition operations,
    * an {@link UnsupportedOperationException} is thrown.
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts 
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index df401dcde4..704dbda7e3 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,7 +43,6 @@ dependencies {
   implementation(libs.commons.lang3)
   implementation(libs.guava)
   implementation(libs.hadoop3.client.api)
-  implementation(libs.hadoop3.client.runtime)
   implementation(libs.lance)
 
   annotationProcessor(libs.lombok)
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 358c2dcab5..89a0ef58ef 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -19,15 +19,16 @@
 package org.apache.gravitino.catalog.lakehouse;
 
 import static org.apache.gravitino.Entity.EntityType.TABLE;
-import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
@@ -40,8 +41,10 @@ import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.catalog.ManagedSchemaOperations;
 import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
+import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.connector.GenericLakehouseTable;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.SupportsSchemas;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -51,8 +54,10 @@ import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
-import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
@@ -61,6 +66,8 @@ import 
org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,14 +81,16 @@ public class GenericLakehouseCatalogOperations
   private static final String SLASH = "/";
 
   private final ManagedSchemaOperations managedSchemaOps;
-  private static final Map<String, LakehouseCatalogOperations> 
SUPPORTED_FORMATS =
-      Maps.newHashMap();
 
-  private Optional<Path> catalogLakehouseDir;
+  private Optional<Path> catalogLakehouseLocation;
+
+  private static final Map<LakehouseTableFormat, LakehouseCatalogOperations> 
SUPPORTED_FORMATS =
+      Maps.newConcurrentMap();
+
   private Map<String, String> catalogConfig;
   private CatalogInfo catalogInfo;
   private HasPropertyMetadata propertiesMetadata;
-
+  private EntityStore store;
   /**
    * Initializes the generic lakehouse catalog operations with the provided 
configuration.
    *
@@ -94,15 +103,16 @@ public class GenericLakehouseCatalogOperations
   public void initialize(
       Map<String, String> conf, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
-    String catalogDir =
+    String catalogLocation =
         (String)
             propertiesMetadata
                 .catalogPropertiesMetadata()
-                .getOrDefault(conf, 
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_DIR);
-    this.catalogLakehouseDir =
-        StringUtils.isNotBlank(catalogDir)
-            ? 
Optional.of(catalogDir).map(this::ensureTrailingSlash).map(Path::new)
+                .getOrDefault(conf, 
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
+    this.catalogLakehouseLocation =
+        StringUtils.isNotBlank(catalogLocation)
+            ? 
Optional.of(catalogLocation).map(this::ensureTrailingSlash).map(Path::new)
             : Optional.empty();
+    this.store = GravitinoEnv.getInstance().entityStore();
     this.catalogConfig = conf;
     this.catalogInfo = info;
     this.propertiesMetadata = propertiesMetadata;
@@ -165,19 +175,17 @@ public class GenericLakehouseCatalogOperations
 
   @Override
   public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    EntityStore store = GravitinoEnv.getInstance().entityStore();
     NameIdentifier identifier = NameIdentifier.of(namespace.levels());
     try {
       store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
-    } catch (NoSuchTableException e) {
-      throw new NoSuchEntityException(e, "Schema %s does not exist", 
namespace);
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", 
namespace);
     } catch (IOException ioe) {
       throw new RuntimeException("Failed to get schema " + identifier);
     }
 
     try {
-      List<GenericTableEntity> tableEntityList =
-          store.list(namespace, GenericTableEntity.class, TABLE);
+      List<TableEntity> tableEntityList = store.list(namespace, 
TableEntity.class, TABLE);
       return tableEntityList.stream()
           .map(e -> NameIdentifier.of(namespace, e.name()))
           .toArray(NameIdentifier[]::new);
@@ -188,7 +196,23 @@ public class GenericLakehouseCatalogOperations
 
   @Override
   public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    try {
+      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
+      return GenericLakehouseTable.builder()
+          .withFormat(tableEntity.getFormat())
+          .withProperties(tableEntity.getProperties())
+          .withAuditInfo(tableEntity.auditInfo())
+          .withSortOrders(tableEntity.getSortOrder())
+          .withPartitioning(tableEntity.getPartitions())
+          .withDistribution(tableEntity.getDistribution())
+          .withColumns(EntityConverter.toColumns(tableEntity.columns()))
+          .withIndexes(tableEntity.getIndexes())
+          .withName(tableEntity.name())
+          .withComment(tableEntity.getComment())
+          .build();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables under schema " + 
ident.namespace(), e);
+    }
   }
 
   @Override
@@ -202,54 +226,92 @@ public class GenericLakehouseCatalogOperations
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
+    LakehouseTableFormat format =
+        (LakehouseTableFormat)
+            propertiesMetadata
+                .tablePropertiesMetadata()
+                .getOrDefault(properties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
     Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
+
     String tableLocation = calculateTableLocation(schema, ident, properties);
     Map<String, String> tableStorageProps = calculateTableStorageProps(schema, 
properties);
 
     Map<String, String> newProperties = Maps.newHashMap(properties);
-    newProperties.put(LOCATION, tableLocation);
+    
newProperties.put(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION, 
tableLocation);
     newProperties.putAll(tableStorageProps);
 
-    String format = properties.getOrDefault("format", "lance");
-    LakehouseCatalogOperations lakehouseCatalogOperations =
-        SUPPORTED_FORMATS.compute(
-            format,
-            (k, v) ->
-                v == null
-                    ? createLakehouseCatalogOperations(
-                        format, properties, catalogInfo, propertiesMetadata)
-                    : v);
-
-    return lakehouseCatalogOperations.createTable(
-        ident, columns, comment, newProperties, partitions, distribution, 
sortOrders, indexes);
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+    IdGenerator idGenerator = GravitinoEnv.getInstance().idGenerator();
+    List<ColumnEntity> columnEntityList =
+        IntStream.range(0, columns.length)
+            .mapToObj(
+                i -> ColumnEntity.toColumnEntity(columns[i], i, 
idGenerator.nextId(), auditInfo))
+            .collect(Collectors.toList());
+
+    TableEntity entityToStore;
+    try {
+      entityToStore =
+          TableEntity.builder()
+              .withName(ident.name())
+              .withNamespace(ident.namespace())
+              .withColumns(columnEntityList)
+              .withFormat(format.lowerName())
+              .withProperties(newProperties)
+              .withComment(comment)
+              .withPartitions(partitions)
+              .withSortOrder(sortOrders)
+              .withDistribution(distribution)
+              .withIndexes(indexes)
+              .withId(idGenerator.nextId())
+              .withAuditInfo(auditInfo)
+              .build();
+      store.put(entityToStore);
+      LakehouseCatalogOperations lanceCatalogOperations =
+          getLakehouseCatalogOperations(newProperties);
+      return lanceCatalogOperations.createTable(
+          ident, columns, comment, newProperties, partitions, distribution, 
sortOrders, indexes);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create table " + ident, e);
+    }
   }
 
   private String calculateTableLocation(
       Schema schema, NameIdentifier tableIdent, Map<String, String> 
tableProperties) {
-    String tableLocation = tableProperties.get(LOCATION);
+    String tableLocation =
+        (String)
+            propertiesMetadata
+                .tablePropertiesMetadata()
+                .getOrDefault(
+                    tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
     if (StringUtils.isNotBlank(tableLocation)) {
       return ensureTrailingSlash(tableLocation);
     }
 
-    String schemaLocation = schema.properties() == null ? null : 
schema.properties().get(LOCATION);
+    String schemaLocation =
+        schema.properties() == null
+            ? null
+            : 
schema.properties().get(GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
 
     // If we do not set location in table properties, and schema location is 
set, use schema
-    // location
-    // as the base path.
+    // location as the base path.
     if (StringUtils.isNotBlank(schemaLocation)) {
       return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
     }
 
     // If the schema location is not set, use catalog lakehouse dir as the 
base path. Or else, throw
     // an exception.
-    if (catalogLakehouseDir.isEmpty()) {
+    if (catalogLakehouseLocation.isEmpty()) {
       throw new RuntimeException(
           String.format(
               "No location specified for table %s, you need to set location 
either in catalog, schema, or table properties",
               tableIdent));
     }
 
-    String catalogLakehousePath = catalogLakehouseDir.get().toString();
+    String catalogLakehousePath = catalogLakehouseLocation.get().toString();
     String[] nsLevels = tableIdent.namespace().levels();
     String schemaName = nsLevels[nsLevels.length - 1];
     return ensureTrailingSlash(catalogLakehousePath)
@@ -262,21 +324,12 @@ public class GenericLakehouseCatalogOperations
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    EntityStore store = GravitinoEnv.getInstance().entityStore();
     Namespace namespace = ident.namespace();
     try {
-      GenericTableEntity tableEntity =
-          store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
       Map<String, String> tableProperties = tableEntity.getProperties();
-      String format = tableProperties.getOrDefault("format", "lance");
       LakehouseCatalogOperations lakehouseCatalogOperations =
-          SUPPORTED_FORMATS.compute(
-              format,
-              (k, v) ->
-                  v == null
-                      ? createLakehouseCatalogOperations(
-                          format, tableProperties, catalogInfo, 
propertiesMetadata)
-                      : v);
+          getLakehouseCatalogOperations(tableProperties);
       return lakehouseCatalogOperations.alterTable(ident, changes);
     } catch (IOException e) {
       throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
@@ -285,28 +338,36 @@ public class GenericLakehouseCatalogOperations
 
   @Override
   public boolean dropTable(NameIdentifier ident) {
-    EntityStore store = GravitinoEnv.getInstance().entityStore();
-    GenericTableEntity tableEntity;
+    Namespace namespace = ident.namespace();
     try {
-      tableEntity = store.get(ident, Entity.EntityType.TABLE, 
GenericTableEntity.class);
-    } catch (NoSuchEntityException e) {
-      LOG.warn("Table {} does not exist, skip dropping.", ident);
+      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
+      LakehouseCatalogOperations lakehouseCatalogOperations =
+          getLakehouseCatalogOperations(tableEntity.getProperties());
+      return lakehouseCatalogOperations.dropTable(ident);
+    } catch (NoSuchTableException e) {
+      LOG.warn("Table {} does not exist, skip dropping it.", ident);
       return false;
-    } catch (IOException ioe) {
-      throw new RuntimeException("Failed to get table " + ident);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
     }
+  }
 
-    Map<String, String> tableProperties = tableEntity.getProperties();
-    String format = tableProperties.getOrDefault("format", "lance");
-    LakehouseCatalogOperations lakehouseCatalogOperations =
-        SUPPORTED_FORMATS.compute(
-            format,
-            (k, v) ->
-                v == null
-                    ? createLakehouseCatalogOperations(
-                        format, tableProperties, catalogInfo, 
propertiesMetadata)
-                    : v);
-    return lakehouseCatalogOperations.dropTable(ident);
+  private LakehouseCatalogOperations getLakehouseCatalogOperations(
+      Map<String, String> tableProperties) {
+    LakehouseTableFormat format =
+        (LakehouseTableFormat)
+            propertiesMetadata
+                .tablePropertiesMetadata()
+                .getOrDefault(
+                    tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
+
+    return SUPPORTED_FORMATS.compute(
+        format,
+        (k, v) ->
+            v == null
+                ? createLakehouseCatalogOperations(
+                    format, tableProperties, catalogInfo, propertiesMetadata)
+                : v);
   }
 
   private String ensureTrailingSlash(String path) {
@@ -314,13 +375,13 @@ public class GenericLakehouseCatalogOperations
   }
 
   private LakehouseCatalogOperations createLakehouseCatalogOperations(
-      String format,
+      LakehouseTableFormat format,
       Map<String, String> properties,
       CatalogInfo catalogInfo,
       HasPropertyMetadata propertiesMetadata) {
     LakehouseCatalogOperations operations;
-    switch (format.toLowerCase()) {
-      case "lance":
+    switch (format) {
+      case LANCE:
         operations = new LanceCatalogOperations();
         break;
       default:
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
index e381558c32..b8c3958e9a 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
@@ -31,7 +31,7 @@ import org.apache.gravitino.connector.PropertyEntry;
 
 public class GenericLakehouseCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
 
-  public static final String LAKEHOUSE_DIR = "lakehouse-dir";
+  public static final String LAKEHOUSE_LOCATION = "location";
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
@@ -39,7 +39,7 @@ public class GenericLakehouseCatalogPropertiesMetadata 
extends BaseCatalogProper
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringOptionalPropertyEntry(
-                LAKEHOUSE_DIR,
+                LAKEHOUSE_LOCATION,
                 "The root directory of the lakehouse catalog.",
                 false /* immutable */,
                 null, /* defaultValue */
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
index a6da0ac2de..3dd0abf81d 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
@@ -29,8 +29,8 @@ import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
 public class GenericLakehouseSchemaPropertiesMetadata extends 
BasePropertiesMetadata {
-  public static final String LAKEHOUSE_DIR =
-      GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_DIR;
+  public static final String LAKEHOUSE_LOCATION =
+      GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION;
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
@@ -38,7 +38,7 @@ public class GenericLakehouseSchemaPropertiesMetadata extends 
BasePropertiesMeta
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringOptionalPropertyEntry(
-                LAKEHOUSE_DIR,
+                LAKEHOUSE_LOCATION,
                 "The root directory of the lakehouse schema.",
                 false /* immutable */,
                 null, /* defaultValue */
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
index e9a61a6b0f..f8ca11b0a0 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.catalog.lakehouse;
 
+import static org.apache.gravitino.connector.PropertyEntry.enumPropertyEntry;
 import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
 
 import com.google.common.collect.ImmutableList;
@@ -28,7 +29,8 @@ import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
 public class GenericLakehouseTablePropertiesMetadata extends 
BasePropertiesMetadata {
-  public static final String LOCATION = "location";
+  public static final String LAKEHOUSE_LOCATION = "location";
+  public static final String LAKEHOUSE_FORMAT = "format";
   public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX = 
"lance.storage.";
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -37,11 +39,20 @@ public class GenericLakehouseTablePropertiesMetadata 
extends BasePropertiesMetad
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringOptionalPropertyEntry(
-                LOCATION,
-                "The root directory of the lakehouse table.",
-                true /* immutable */,
+                LAKEHOUSE_LOCATION,
+                "The root directory of the lakehouse catalog.",
+                false /* immutable */,
                 null, /* defaultValue */
                 false /* hidden */),
+            enumPropertyEntry(
+                LAKEHOUSE_FORMAT,
+                "The table format of the lakehouse table (e.g., iceberg, 
delta, lance)",
+                true /* required */,
+                true /* immutable */,
+                LakehouseTableFormat.class /* enumClass */,
+                null /* defaultValue */,
+                false /* hidden */,
+                false /* reserved */),
             PropertyEntry.stringOptionalPropertyPrefixEntry(
                 LANCE_TABLE_STORAGE_OPTION_PREFIX,
                 "The storage options passed to Lance table.",
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
index 66c7147626..d5b95845db 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
@@ -22,4 +22,18 @@ package org.apache.gravitino.catalog.lakehouse;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.rel.TableCatalog;
 
+/**
+ * Interface for detailed lakehouse catalog operations, combining catalog 
operations and table
+ * catalog. {@link GenericLakehouseCatalog} will try to use this interface to 
provide detailed
+ * lakehouse catalog operations.
+ *
+ * <pre>
+ *    GenericLakehouseCatalog.createTable()
+ *       -> LakehouseCatalogOperations.createTable()
+ *         -> LanceTableOperations.createTable()
+ *         -> IcebergTableOperations.createTable()
+ *         -> DeltaTableOperations.createTable()
+ *         ...
+ * </pre>
+ */
 public interface LakehouseCatalogOperations extends CatalogOperations, 
TableCatalog {}
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
similarity index 58%
copy from api/src/main/java/org/apache/gravitino/rel/GenericTable.java
copy to 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
index 4796421c53..57d0230f48 100644
--- a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
@@ -17,31 +17,25 @@
  *  under the License.
  */
 
-package org.apache.gravitino.rel;
+package org.apache.gravitino.catalog.lakehouse;
 
-/** A generic table interface that extends the Table interface. */
-public interface GenericTable extends Table {
+public enum LakehouseTableFormat {
+  LANCE,
 
-  /**
-   * Formats the table as a string representation.
-   *
-   * @return the formatted string representation of the table
-   */
-  String format();
+  DELTA,
 
-  /**
-   * Gets the location of the table.
-   *
-   * @return the location of the table
-   */
-  String location();
+  ICEBERG;
 
-  /**
-   * Indicates whether the table is external.
-   *
-   * @return true if the table is external, false otherwise
-   */
-  default boolean external() {
-    return false;
+  public String lowerName() {
+    return this.name().toLowerCase();
+  }
+
+  public static LakehouseTableFormat fromFormatName(String type) {
+    for (LakehouseTableFormat tableType : LakehouseTableFormat.values()) {
+      if (tableType.name().equalsIgnoreCase(type)) {
+        return tableType;
+      }
+    }
+    throw new IllegalArgumentException("Unknown LakehouseTableFormat: " + 
type);
   }
 }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 9572c656d2..e27f8032ab 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -19,8 +19,8 @@
 
 package org.apache.gravitino.catalog.lakehouse.lance;
 
+import static org.apache.gravitino.Entity.EntityType.TABLE;
 import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
-import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -46,7 +46,10 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata;
 import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
+import org.apache.gravitino.catalog.lakehouse.LakehouseTableFormat;
+import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.GenericLakehouseTable;
 import org.apache.gravitino.connector.HasPropertyMetadata;
@@ -55,9 +58,8 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.GenericTable;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -66,19 +68,16 @@ import 
org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
 import org.apache.gravitino.utils.PrincipalUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 public class LanceCatalogOperations implements LakehouseCatalogOperations {
 
-  private Map<String, String> lancePropertiesMap;
+  private EntityStore store;
 
   @Override
   public void initialize(
       Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
-    lancePropertiesMap = ImmutableMap.copyOf(config);
+    store = GravitinoEnv.getInstance().entityStore();
   }
 
   @Override
@@ -95,13 +94,16 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
 
   @Override
   public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    return new NameIdentifier[0];
+    // No need to do nothing here as GenericLakehouseCatalogOperations will do 
the work.
+    throw new UnsupportedOperationException(
+        "We should not reach here as we could get table info" + "from 
metastore directly.");
   }
 
   @Override
   public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
-    // Should not come here.
-    return null;
+    // No need to do nothing here as GenericLakehouseCatalogOperations will do 
the work.
+    throw new UnsupportedOperationException(
+        "We should not reach here as we could get table info" + "from 
metastore directly.");
   }
 
   @Override
@@ -116,7 +118,7 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
     // Ignore partitions, distributions, sortOrders, and indexes for Lance 
tables;
-    String location = properties.get(LOCATION);
+    String location = 
properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
     Map<String, String> storageProps =
         properties.entrySet().stream()
             .filter(e -> 
e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
@@ -124,7 +126,8 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
                 Collectors.toMap(
                     e -> 
e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
                     Map.Entry::getValue));
-    try (Dataset dataset =
+
+    try (Dataset ignored =
         Dataset.create(
             new RootAllocator(),
             location,
@@ -145,7 +148,7 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
                   .build())
           .withPartitioning(partitions)
           .withSortOrders(sortOrders)
-          .withFormat("lance")
+          .withFormat(LakehouseTableFormat.LANCE.lowerName())
           .build();
     }
   }
@@ -167,6 +170,7 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
     // Lance only supports adding indexes for now.
     List<Index> addedIndexes = Lists.newArrayList();
 
+    // Only support for adding index for now.
     for (TableChange change : changes) {
       if (change instanceof TableChange.AddIndex addIndexChange) {
         Index index =
@@ -179,17 +183,66 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
       }
     }
 
-    EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
-    GenericTableEntity entity;
+    TableEntity updatedEntity;
     try {
-      entity = entityStore.get(ident, Entity.EntityType.TABLE, 
GenericTableEntity.class);
+      TableEntity entity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
+      updatedEntity =
+          store.update(
+              ident,
+              TableEntity.class,
+              TABLE,
+              tableEntity ->
+                  TableEntity.builder()
+                      .withId(tableEntity.id())
+                      .withName(tableEntity.name())
+                      .withNamespace(tableEntity.namespace())
+                      .withFormat(entity.getFormat())
+                      .withAuditInfo(
+                          AuditInfo.builder()
+                              .withCreator(tableEntity.auditInfo().creator())
+                              
.withCreateTime(tableEntity.auditInfo().createTime())
+                              
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                              .withLastModifiedTime(Instant.now())
+                              .build())
+                      .withColumns(tableEntity.columns())
+                      .withIndexes(
+                          ArrayUtils.addAll(
+                              entity.getIndexes(), addedIndexes.toArray(new 
Index[0])))
+                      .withDistribution(tableEntity.getDistribution())
+                      .withPartitions(tableEntity.getPartitions())
+                      .withSortOrder(tableEntity.getSortOrder())
+                      .withProperties(tableEntity.getProperties())
+                      .withComment(tableEntity.getComment())
+                      .build());
+
+      // Add indexes to Lance dataset
+      addLanceIndex(updatedEntity, addedIndexes);
+
+      // return the updated table
+      return GenericLakehouseTable.builder()
+          .withFormat(updatedEntity.getFormat())
+          .withProperties(updatedEntity.getProperties())
+          .withAuditInfo(updatedEntity.auditInfo())
+          .withSortOrders(updatedEntity.getSortOrder())
+          .withPartitioning(updatedEntity.getPartitions())
+          .withDistribution(updatedEntity.getDistribution())
+          .withColumns(EntityConverter.toColumns(updatedEntity.columns()))
+          .withIndexes(updatedEntity.getIndexes())
+          .withName(updatedEntity.name())
+          .withComment(updatedEntity.getComment())
+          .build();
     } catch (NoSuchEntityException e) {
       throw new NoSuchTableException("No such table: %s", ident);
     } catch (IOException ioe) {
       throw new RuntimeException("Failed to load table entity for: " + ident, 
ioe);
     }
+  }
 
-    String location = entity.getProperties().get("location");
+  private void addLanceIndex(TableEntity updatedEntity, List<Index> 
addedIndexes) {
+    String location =
+        updatedEntity
+            .getProperties()
+            .get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
     try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
       // For Lance, we only support adding indexes, so in fact, we can't 
handle drop index here.
       for (Index index : addedIndexes) {
@@ -205,28 +258,7 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
             indexParams,
             true);
       }
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to alter Lance table: " + ident, e);
     }
-
-    GenericTable oldTable = entity.toGenericTable();
-    Index[] newIndexes = oldTable.index();
-    for (Index index : addedIndexes) {
-      newIndexes = ArrayUtils.add(newIndexes, index);
-    }
-
-    return GenericLakehouseTable.builder()
-        .withFormat(oldTable.format())
-        .withProperties(oldTable.properties())
-        .withAuditInfo((AuditInfo) oldTable.auditInfo())
-        .withSortOrders(oldTable.sortOrder())
-        .withPartitioning(oldTable.partitioning())
-        .withDistribution(oldTable.distribution())
-        .withColumns(oldTable.columns())
-        .withIndexes(newIndexes)
-        .withName(oldTable.name())
-        .withComment(oldTable.comment())
-        .build();
   }
 
   private IndexParams getIndexParamsByIndexType(IndexType indexType) {
@@ -249,10 +281,18 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
   @Override
   public boolean dropTable(NameIdentifier ident) {
     try {
-      String location = lancePropertiesMap.get("location");
-      // Remove the directory on storage
-      FileSystem fs = FileSystem.get(new Configuration());
-      return fs.delete(new Path(location), true);
+      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
+      Map<String, String> lancePropertiesMap = tableEntity.getProperties();
+      String location =
+          
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
+
+      if (!store.delete(ident, Entity.EntityType.TABLE)) {
+        throw new RuntimeException("Failed to drop Lance table: " + 
ident.name());
+      }
+
+      // Drop the Lance dataset from cloud storage.
+      Dataset.drop(location, ImmutableMap.of());
+      return true;
     } catch (IOException e) {
       throw new RuntimeException("Failed to drop Lance table: " + 
ident.name(), e);
     }
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
similarity index 50%
rename from api/src/main/java/org/apache/gravitino/rel/GenericTable.java
rename to 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
index 4796421c53..734309a444 100644
--- a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
@@ -17,31 +17,26 @@
  *  under the License.
  */
 
-package org.apache.gravitino.rel;
+package org.apache.gravitino.catalog.lakehouse.utils;
 
-/** A generic table interface that extends the Table interface. */
-public interface GenericTable extends Table {
+import java.util.List;
+import org.apache.gravitino.connector.GenericLakehouseColumn;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.rel.Column;
 
-  /**
-   * Formats the table as a string representation.
-   *
-   * @return the formatted string representation of the table
-   */
-  String format();
-
-  /**
-   * Gets the location of the table.
-   *
-   * @return the location of the table
-   */
-  String location();
+public class EntityConverter {
+  public static Column[] toColumns(List<ColumnEntity> columnEntities) {
+    return 
columnEntities.stream().map(EntityConverter::toColumn).toArray(Column[]::new);
+  }
 
-  /**
-   * Indicates whether the table is external.
-   *
-   * @return true if the table is external, false otherwise
-   */
-  default boolean external() {
-    return false;
+  private static Column toColumn(ColumnEntity columnEntity) {
+    return GenericLakehouseColumn.builder()
+        .withName(columnEntity.name())
+        .withComment(columnEntity.comment())
+        .withAutoIncrement(columnEntity.autoIncrement())
+        .withNullable(columnEntity.nullable())
+        .withType(columnEntity.dataType())
+        .withDefaultValue(columnEntity.defaultValue())
+        .build();
   }
 }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
new file mode 100644
index 0000000000..8dfd3b5ce0
--- /dev/null
+++ 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
@@ -0,0 +1,102 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertiesMetadata;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestPropertiesMetadata {
+  public static GenericLakehouseCatalog genericLakehouseCatalog;
+
+  @BeforeAll
+  static void init() {
+    genericLakehouseCatalog = new GenericLakehouseCatalog();
+  }
+
+  @Test
+  void testCatalogPropertiesMetadata() {
+    PropertiesMetadata catalogPropertiesMetadata =
+        genericLakehouseCatalog.catalogPropertiesMetadata();
+    Assertions.assertNotNull(catalogPropertiesMetadata);
+
+    Map<String, String> catalogProperties =
+        ImmutableMap.of(
+            "storage.type", "s3",
+            "storage.s3.bucket", "my-bucket",
+            "storage.s3.region", "us-west-2",
+            "location", "/tmp/test1");
+
+    String catalogLocation =
+        (String)
+            catalogPropertiesMetadata.getOrDefault(
+                catalogProperties, 
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
+    Assertions.assertEquals("/tmp/test1", catalogLocation);
+  }
+
+  @Test
+  void testSchemaPropertiesMetadata() {
+    PropertiesMetadata schemaPropertiesMetadata =
+        genericLakehouseCatalog.schemaPropertiesMetadata();
+    Assertions.assertNotNull(schemaPropertiesMetadata);
+
+    Map<String, String> schemaProperties =
+        ImmutableMap.of(
+            "storage.type", "s3",
+            "storage.s3.bucket", "my-bucket",
+            "storage.s3.region", "us-west-2",
+            "location", "/tmp/test_schema");
+
+    String schemaLocation =
+        (String)
+            schemaPropertiesMetadata.getOrDefault(
+                schemaProperties, 
GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
+    Assertions.assertEquals("/tmp/test_schema", schemaLocation);
+  }
+
+  @Test
+  void testTablePropertiesMetadata() {
+    PropertiesMetadata tablePropertiesMetadata = 
genericLakehouseCatalog.tablePropertiesMetadata();
+    Assertions.assertNotNull(tablePropertiesMetadata);
+
+    Map<String, String> tableProperties =
+        ImmutableMap.of(
+            "storage.type", "s3",
+            "storage.s3.bucket", "my-bucket",
+            "storage.s3.region", "us-west-2",
+            "location", "/tmp/test_table",
+            "format", "iceberg");
+
+    String tableLocation =
+        (String)
+            tablePropertiesMetadata.getOrDefault(
+                tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
+    Assertions.assertEquals("/tmp/test_table", tableLocation);
+
+    LakehouseTableFormat tableFormat =
+        (LakehouseTableFormat)
+            tablePropertiesMetadata.getOrDefault(
+                tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
+    Assertions.assertEquals(LakehouseTableFormat.ICEBERG, tableFormat);
+  }
+}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
new file mode 100644
index 0000000000..9da5ed530e
--- /dev/null
+++ 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
@@ -0,0 +1,78 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.utils;
+
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.util.List;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestEntityConverter {
+
+  @Test
+  void testToColumns() {
+    AuditInfo auditInfo = AuditInfo.builder().build();
+    List<ColumnEntity> columnEntities =
+        List.of(
+            ColumnEntity.builder()
+                .withName("id")
+                .withId(1L)
+                .withDataType(Types.IntegerType.get())
+                .withComment("Identifier")
+                .withAutoIncrement(true)
+                .withNullable(false)
+                .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+                .withAuditInfo(auditInfo)
+                .withPosition(1)
+                .build(),
+            ColumnEntity.builder()
+                .withName("name")
+                .withId(2L)
+                .withDataType(Types.StringType.get())
+                .withComment("Name of the entity")
+                .withAutoIncrement(false)
+                .withNullable(true)
+                .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+                .withPosition(2)
+                .withAuditInfo(auditInfo)
+                .build());
+    var columns = EntityConverter.toColumns(columnEntities);
+    Assertions.assertEquals(2, columns.length);
+    for (var column : columns) {
+      if (column.name().equals("id")) {
+        Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
+        Assertions.assertEquals("Identifier", column.comment());
+        Assertions.assertTrue(column.autoIncrement());
+        Assertions.assertFalse(column.nullable());
+        Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
+      } else if (column.name().equals("name")) {
+        Assertions.assertEquals(Types.StringType.get(), column.dataType());
+        Assertions.assertEquals("Name of the entity", column.comment());
+        Assertions.assertFalse(column.autoIncrement());
+        Assertions.assertTrue(column.nullable());
+        Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
+      }
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
index 921b14dcdf..7a0d90ead7 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
@@ -129,6 +129,16 @@ public final class EntityCombinedTable implements Table {
     return table.index();
   }
 
+  @Override
+  public String format() {
+    return table.format();
+  }
+
+  @Override
+  public String location() {
+    return table.location();
+  }
+
   public boolean imported() {
     return imported;
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 27119f0c99..e549b806d8 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -27,7 +27,6 @@ import static 
org.apache.gravitino.utils.NameIdentifierUtil.getSchemaIdentifier;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,15 +36,16 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.gravitino.Catalog;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.CatalogManager.CatalogWrapper;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -54,10 +54,8 @@ import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.GenericTable;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -117,6 +115,18 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
    */
   @Override
   public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    if (isManagedTable(catalogIdent)) {
+      return TreeLockUtils.doWithTreeLock(
+          ident,
+          LockType.READ,
+          () ->
+              doWithCatalog(
+                  catalogIdent,
+                  c -> c.doWithTableOps(t -> t.loadTable(ident)),
+                  NoSuchTableException.class));
+    }
+
     EntityCombinedTable entityCombinedTable =
         TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> 
internalLoadTable(ident));
 
@@ -240,6 +250,17 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   NoSuchTableException.class,
                   IllegalArgumentException.class);
 
+          if (isManagedTable(catalogIdent)) {
+            // For generic lakehouse catalog, all operations will be 
dispatched to the underlying
+            // catalog.
+            return EntityCombinedTable.of(alteredTable)
+                .withHiddenProperties(
+                    getHiddenPropertyNames(
+                        getCatalogIdentifier(ident),
+                        HasPropertyMetadata::tablePropertiesMetadata,
+                        alteredTable.properties()));
+          }
+
           StringIdentifier stringId = 
getStringIdFromProperties(alteredTable.properties());
           // Case 1: The table is not created by Gravitino and this table is 
never imported.
           TableEntity te = null;
@@ -262,57 +283,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             tableId = te.id();
           }
 
-          if (isGenericLakehouseCatalog(catalogIdent)) {
-            // For generic lakehouse catalog, we only update the table entity 
with basic info.
-            GenericTableEntity genericTableEntity =
-                operateOnEntity(
-                    ident, id -> store.get(id, TABLE, 
GenericTableEntity.class), "GET", tableId);
-            if (genericTableEntity == null) {
-              throw new NoSuchTableException("No such table: %s", ident);
-            }
-
-            GenericTable genericTable = (GenericTable) alteredTable;
-            GenericTableEntity updatedGenericTableEntity =
-                operateOnEntity(
-                    ident,
-                    id ->
-                        store.update(
-                            id,
-                            GenericTableEntity.class,
-                            TABLE,
-                            tableEntity ->
-                                GenericTableEntity.getBuilder()
-                                    .withId(tableEntity.id())
-                                    .withName(alteredTable.name())
-                                    .withNamespace(getNewNamespace(ident, 
changes))
-                                    .withFormat(genericTable.format())
-                                    .withAuditInfo(
-                                        AuditInfo.builder()
-                                            
.withCreator(tableEntity.auditInfo().creator())
-                                            
.withCreateTime(tableEntity.auditInfo().createTime())
-                                            .withLastModifier(
-                                                
PrincipalUtils.getCurrentPrincipal().getName())
-                                            
.withLastModifiedTime(Instant.now())
-                                            .build())
-                                    .withColumns(tableEntity.columns())
-                                    .withIndexes(genericTable.index())
-                                    
.withDistribution(genericTable.distribution())
-                                    
.withPartitions(genericTable.partitioning())
-                                    .withSortOrder(genericTable.sortOrder())
-                                    .withProperties(genericTable.properties())
-                                    .withComment(genericTable.comment())
-                                    .build()),
-                    "UPDATE",
-                    tableId);
-
-            return EntityCombinedTable.of(alteredTable, 
updatedGenericTableEntity)
-                .withHiddenProperties(
-                    getHiddenPropertyNames(
-                        getCatalogIdentifier(ident),
-                        HasPropertyMetadata::tablePropertiesMetadata,
-                        alteredTable.properties()));
-          }
-
           TableEntity updatedTableEntity =
               operateOnEntity(
                   ident,
@@ -371,6 +341,13 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
         LockType.WRITE,
         () -> {
           NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+          if (isManagedTable(catalogIdent)) {
+            return doWithCatalog(
+                catalogIdent,
+                c -> c.doWithTableOps(t -> t.dropTable(ident)),
+                RuntimeException.class);
+          }
+
           boolean droppedFromCatalog =
               doWithCatalog(
                   catalogIdent,
@@ -542,19 +519,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
   }
 
   private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
-    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    if (isGenericLakehouseCatalog(catalogIdent)) {
-      try {
-        GenericTableEntity tableEntity = store.get(ident, TABLE, 
GenericTableEntity.class);
-        if (tableEntity != null) {
-          GenericTable genericTable = tableEntity.toGenericTable();
-          return EntityCombinedTable.of(genericTable).withImported(true);
-        }
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to load table entity " + ident, 
ioe);
-      }
-    }
-
     NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
     Table table =
         doWithCatalog(
@@ -627,6 +591,32 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   return null;
                 }),
         IllegalArgumentException.class);
+
+    if (isManagedTable(catalogIdent)) {
+      // For generic lakehouse catalog, all operations will be dispatched to 
the underlying catalog.
+      Table table =
+          doWithCatalog(
+              catalogIdent,
+              c ->
+                  c.doWithTableOps(
+                      t ->
+                          t.createTable(
+                              ident,
+                              columns,
+                              comment,
+                              properties,
+                              partitions == null ? EMPTY_TRANSFORM : 
partitions,
+                              distribution == null ? Distributions.NONE : 
distribution,
+                              sortOrders == null ? new SortOrder[0] : 
sortOrders,
+                              indexes == null ? Indexes.EMPTY_INDEXES : 
indexes)),
+              NoSuchSchemaException.class,
+              TableAlreadyExistsException.class);
+      return EntityCombinedTable.of(table)
+          .withHiddenProperties(
+              getHiddenPropertyNames(
+                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+    }
+
     long uid = idGenerator.nextId();
     // Add StringIdentifier to the properties, the specific catalog will 
handle this
     // StringIdentifier to make sure only when the operation is successful, 
the related
@@ -665,41 +655,19 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             .mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, 
idGenerator.nextId(), audit))
             .collect(Collectors.toList());
 
-    TableEntity tableEntity;
-    if (isGenericLakehouseCatalog(catalogIdent)) {
-      // For generic lakehouse catalog, we only create the table entity with 
basic info.
-      GenericTable genericTable = (GenericTable) table;
-      tableEntity =
-          GenericTableEntity.getBuilder()
-              .withId(uid)
-              .withName(ident.name())
-              .withNamespace(ident.namespace())
-              .withFormat(genericTable.format())
-              .withAuditInfo(audit)
-              .withColumns(columnEntityList)
-              .withIndexes(table.index())
-              .withDistribution(table.distribution())
-              .withFormat(genericTable.format())
-              .withPartitions(table.partitioning())
-              .withSortOrder(table.sortOrder())
-              .withProperties(genericTable.properties())
-              .withComment(genericTable.comment())
-              .build();
-    } else {
-      tableEntity =
-          TableEntity.builder()
-              .withId(uid)
-              .withName(ident.name())
-              .withNamespace(ident.namespace())
-              .withColumns(columnEntityList)
-              .withAuditInfo(audit)
-              .build();
-    }
+    TableEntity tableEntity =
+        TableEntity.builder()
+            .withId(uid)
+            .withName(ident.name())
+            .withNamespace(ident.namespace())
+            .withColumns(columnEntityList)
+            .withAuditInfo(audit)
+            .build();
 
     try {
       store.put(tableEntity, true /* overwrite */);
     } catch (Exception e) {
-      if (isGenericLakehouseCatalog(catalogIdent)) {
+      if (isManagedTable(catalogIdent)) {
         // Drop table
         doWithCatalog(
             catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)), 
RuntimeException.class);
@@ -727,16 +695,13 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             .collect(Collectors.toList());
   }
 
-  private boolean isGenericLakehouseCatalog(NameIdentifier catalogIdent) {
+  private boolean isManagedTable(NameIdentifier catalogIdent) {
     CatalogManager catalogManager = 
GravitinoEnv.getInstance().catalogManager();
-    try {
-      Catalog catalog = catalogManager.loadCatalog(catalogIdent);
-      return catalog.type() == Catalog.Type.RELATIONAL
-          && catalog.provider().equals("generic-lakehouse");
-    } catch (NoSuchEntityException e) {
-      LOG.warn("Catalog not found: {}", catalogIdent, e);
-      return false;
-    }
+    CatalogWrapper wrapper = catalogManager.loadCatalogAndWrap(catalogIdent);
+    Capability capability = wrapper.catalog().capability();
+
+    CapabilityResult result = 
capability.managedStorage(Capability.Scope.TABLE);
+    return result == CapabilityResult.SUPPORTED;
   }
 
   private boolean isSameColumn(Column left, int columnPosition, ColumnEntity 
right) {
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
index b84b265256..4fb7b5d12d 100644
--- 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
+++ 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
@@ -50,6 +50,7 @@ public class GenericLakehouseColumn extends BaseColumn {
       hiveColumn.dataType = dataType;
       hiveColumn.nullable = nullable;
       hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : 
defaultValue;
+      hiveColumn.autoIncrement = autoIncrement;
       return hiveColumn;
     }
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
index a9379a5b31..7206125386 100644
--- 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
+++ 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
@@ -19,12 +19,7 @@
 
 package org.apache.gravitino.connector;
 
-import org.apache.gravitino.rel.GenericTable;
-
-public class GenericLakehouseTable extends BaseTable implements GenericTable {
-  @SuppressWarnings("unused")
-  private String schemaName;
-
+public class GenericLakehouseTable extends BaseTable {
   private String format;
 
   public static Builder builder() {
@@ -53,14 +48,8 @@ public class GenericLakehouseTable extends BaseTable 
implements GenericTable {
 
   public static class Builder extends BaseTableBuilder<Builder, 
GenericLakehouseTable> {
 
-    private String schemaName;
     private String format;
 
-    public Builder withSchemaName(String schemaName) {
-      this.schemaName = schemaName;
-      return this;
-    }
-
     public Builder withFormat(String format) {
       this.format = format;
       return this;
@@ -69,7 +58,6 @@ public class GenericLakehouseTable extends BaseTable 
implements GenericTable {
     @Override
     protected GenericLakehouseTable internalBuild() {
       GenericLakehouseTable genericLakehouseTable = new 
GenericLakehouseTable();
-      genericLakehouseTable.schemaName = this.schemaName;
       genericLakehouseTable.format = this.format;
       genericLakehouseTable.columns = this.columns;
       genericLakehouseTable.comment = this.comment;
diff --git 
a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
deleted file mode 100644
index 4b2dd9ad03..0000000000
--- a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.meta;
-
-import com.google.common.collect.Maps;
-import java.util.Map;
-import lombok.Getter;
-import org.apache.gravitino.Field;
-import org.apache.gravitino.connector.GenericLakehouseColumn;
-import org.apache.gravitino.connector.GenericLakehouseTable;
-import org.apache.gravitino.rel.GenericTable;
-import org.apache.gravitino.rel.expressions.distributions.Distribution;
-import org.apache.gravitino.rel.expressions.sorts.SortOrder;
-import org.apache.gravitino.rel.expressions.transforms.Transform;
-import org.apache.gravitino.rel.indexes.Index;
-
-@Getter
-public class GenericTableEntity extends TableEntity {
-  public static final Field FORMAT = Field.required("format", Long.class, "The 
table's format");
-  public static final Field PROPERTIES =
-      Field.optional("properties", Map.class, "The table's properties");
-
-  public static final Field PARTITIONS =
-      Field.optional("partitions", Transform[].class, "The table's partition");
-
-  public static final Field SORT_ORDER =
-      Field.optional("sortOrders", SortOrder[].class, "The table's sort 
order");
-
-  public static final Field DISTRIBUTION =
-      Field.optional("distribution", Distribution.class, "The table's 
distribution");
-
-  public static final Field INDEXES =
-      Field.optional("indexes", Index[].class, "The table's indexes");
-
-  public static final Field COMMENT =
-      Field.optional("comment", String.class, "The table's comment");
-
-  public GenericTableEntity() {
-    super();
-  }
-
-  @Override
-  public Map<Field, Object> fields() {
-    Map<Field, Object> superFields = super.fields();
-    Map<Field, Object> result = Maps.newHashMap(superFields);
-    result.put(FORMAT, format);
-    result.put(PROPERTIES, properties);
-    result.put(PARTITIONS, partitions);
-    result.put(SORT_ORDER, sortOrder);
-    result.put(DISTRIBUTION, distribution);
-    result.put(INDEXES, indexes);
-    result.put(COMMENT, comment);
-
-    return result;
-  }
-
-  private String format;
-  @Getter private Map<String, String> properties;
-  private Transform[] partitions;
-  private SortOrder[] sortOrder;
-  private Distribution distribution;
-  private Index[] indexes;
-  private String comment;
-
-  public static class Builder {
-    private final GenericTableEntity tableEntity;
-
-    public Builder() {
-      this.tableEntity = new GenericTableEntity();
-    }
-
-    public Builder withId(Long id) {
-      tableEntity.id = id;
-      return this;
-    }
-
-    public Builder withName(String name) {
-      tableEntity.name = name;
-      return this;
-    }
-
-    public Builder withAuditInfo(AuditInfo auditInfo) {
-      tableEntity.auditInfo = auditInfo;
-      return this;
-    }
-
-    public Builder withColumns(java.util.List<ColumnEntity> columns) {
-      tableEntity.columns = columns;
-      return this;
-    }
-
-    public Builder withNamespace(org.apache.gravitino.Namespace namespace) {
-      tableEntity.namespace = namespace;
-      return this;
-    }
-
-    public Builder withFormat(String format) {
-      tableEntity.format = format;
-      return this;
-    }
-
-    public Builder withProperties(Map<String, String> properties) {
-      tableEntity.properties = properties;
-      return this;
-    }
-
-    public Builder withPartitions(Transform[] partitions) {
-      tableEntity.partitions = partitions;
-      return this;
-    }
-
-    public Builder withSortOrder(SortOrder[] sortOrder) {
-      tableEntity.sortOrder = sortOrder;
-      return this;
-    }
-
-    public Builder withDistribution(Distribution distribution) {
-      tableEntity.distribution = distribution;
-      return this;
-    }
-
-    public Builder withIndexes(Index[] indexes) {
-      tableEntity.indexes = indexes;
-      return this;
-    }
-
-    public Builder withComment(String comment) {
-      tableEntity.comment = comment;
-      return this;
-    }
-
-    public GenericTableEntity build() {
-      return tableEntity;
-    }
-  }
-
-  public static GenericTableEntity.Builder getBuilder() {
-    return new GenericTableEntity.Builder();
-  }
-
-  public GenericTable toGenericTable() {
-    return GenericLakehouseTable.builder()
-        .withFormat(format)
-        .withProperties(properties)
-        .withAuditInfo(auditInfo)
-        .withSortOrders(sortOrder)
-        .withPartitioning(partitions)
-        .withDistribution(distribution)
-        .withColumns(
-            columns.stream()
-                .map(this::toGenericLakehouseColumn)
-                .toArray(GenericLakehouseColumn[]::new))
-        .withIndexes(indexes)
-        .withName(name)
-        .withComment(comment)
-        .build();
-  }
-
-  private GenericLakehouseColumn toGenericLakehouseColumn(ColumnEntity 
columnEntity) {
-    return GenericLakehouseColumn.builder()
-        .withName(columnEntity.name())
-        .withComment(columnEntity.comment())
-        .withAutoIncrement(columnEntity.autoIncrement())
-        .withNullable(columnEntity.nullable())
-        .withType(columnEntity.dataType())
-        .withDefaultValue(columnEntity.defaultValue())
-        .build();
-  }
-}
diff --git a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
index 595defed08..795db870d9 100644
--- a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
@@ -20,15 +20,21 @@ package org.apache.gravitino.meta;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import lombok.Getter;
 import lombok.ToString;
 import org.apache.gravitino.Auditable;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.Field;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.utils.CollectionUtils;
 
 /** A class representing a table entity in Apache Gravitino. */
@@ -42,15 +48,42 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
   public static final Field COLUMNS =
       Field.optional("columns", List.class, "The columns of the table");
 
-  protected Long id;
+  public static final Field FORMAT = Field.optional("format", String.class, 
"The table's format");
+  public static final Field PROPERTIES =
+      Field.optional("properties", Map.class, "The table's properties");
 
-  protected String name;
+  public static final Field PARTITIONS =
+      Field.optional("partitions", Transform[].class, "The table's partition");
 
-  protected AuditInfo auditInfo;
+  public static final Field SORT_ORDER =
+      Field.optional("sortOrders", SortOrder[].class, "The table's sort 
order");
 
-  protected Namespace namespace;
+  public static final Field DISTRIBUTION =
+      Field.optional("distribution", Distribution.class, "The table's 
distribution");
 
-  protected List<ColumnEntity> columns;
+  public static final Field INDEXES =
+      Field.optional("indexes", Index[].class, "The table's indexes");
+
+  public static final Field COMMENT =
+      Field.optional("comment", String.class, "The table's comment");
+
+  private Long id;
+
+  private String name;
+
+  private AuditInfo auditInfo;
+
+  private Namespace namespace;
+
+  private List<ColumnEntity> columns;
+
+  @Getter private String format;
+  @Getter private Map<String, String> properties;
+  @Getter private Transform[] partitions;
+  @Getter private SortOrder[] sortOrder;
+  @Getter private Distribution distribution;
+  @Getter private Index[] indexes;
+  @Getter private String comment;
 
   /**
    * Returns a map of the fields and their corresponding values for this table.
@@ -65,6 +98,14 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
     fields.put(AUDIT_INFO, auditInfo);
     fields.put(COLUMNS, columns);
 
+    fields.put(FORMAT, format);
+    fields.put(PROPERTIES, properties);
+    fields.put(PARTITIONS, partitions);
+    fields.put(SORT_ORDER, sortOrder);
+    fields.put(DISTRIBUTION, distribution);
+    fields.put(INDEXES, indexes);
+    fields.put(COMMENT, comment);
+
     return fields;
   }
 
@@ -136,7 +177,15 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
         && Objects.equal(name, baseTable.name)
         && Objects.equal(namespace, baseTable.namespace)
         && Objects.equal(auditInfo, baseTable.auditInfo)
-        && CollectionUtils.isEqualCollection(columns, baseTable.columns);
+        && CollectionUtils.isEqualCollection(columns, baseTable.columns)
+        && Objects.equal(format, baseTable.format)
+        // Please check the correctness of this comparison.
+        && Objects.equal(properties, baseTable.properties)
+        && Arrays.equals(partitions, baseTable.partitions)
+        && Arrays.equals(sortOrder, baseTable.sortOrder)
+        && Objects.equal(distribution, baseTable.distribution)
+        && Arrays.equals(indexes, baseTable.indexes)
+        && Objects.equal(comment, baseTable.comment);
   }
 
   @Override
@@ -177,6 +226,41 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
       return this;
     }
 
+    public Builder withFormat(String format) {
+      tableEntity.format = format;
+      return this;
+    }
+
+    public Builder withProperties(Map<String, String> properties) {
+      tableEntity.properties = properties;
+      return this;
+    }
+
+    public Builder withPartitions(Transform[] partitions) {
+      tableEntity.partitions = partitions;
+      return this;
+    }
+
+    public Builder withSortOrder(SortOrder[] sortOrder) {
+      tableEntity.sortOrder = sortOrder;
+      return this;
+    }
+
+    public Builder withDistribution(Distribution distribution) {
+      tableEntity.distribution = distribution;
+      return this;
+    }
+
+    public Builder withIndexes(Index[] indexes) {
+      tableEntity.indexes = indexes;
+      return this;
+    }
+
+    public Builder withComment(String comment) {
+      tableEntity.comment = comment;
+      return this;
+    }
+
     public TableEntity build() {
       tableEntity.validate();
 
@@ -184,6 +268,22 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
         tableEntity.columns = Collections.emptyList();
       }
 
+      if (tableEntity.properties == null) {
+        tableEntity.properties = Collections.emptyMap();
+      }
+
+      if (tableEntity.indexes == null) {
+        tableEntity.indexes = new Index[0];
+      }
+
+      if (tableEntity.partitions == null) {
+        tableEntity.partitions = new Transform[0];
+      }
+
+      if (tableEntity.sortOrder == null) {
+        tableEntity.sortOrder = new SortOrder[0];
+      }
+
       return tableEntity;
     }
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
index e919e539e9..0eb13e4a1e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
@@ -56,9 +56,6 @@ public interface TableMetaMapper {
   TablePO selectTableMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name);
 
-  @SelectProvider(type = TableMetaSQLProviderFactory.class, method = 
"selectTableMetaById")
-  TablePO selectTableMetaById(@Param("tableId") Long tableId);
-
   @InsertProvider(type = TableMetaSQLProviderFactory.class, method = 
"insertTableMeta")
   void insertTableMeta(@Param("tableMeta") TablePO tablePO);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
index a723c3db4a..16f1ad7a34 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
@@ -20,8 +20,10 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.DeleteProvider;
 import org.apache.ibatis.annotations.InsertProvider;
 import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.UpdateProvider;
 
 public interface TableVersionMapper {
   String TABLE_NAME = "table_version_info";
@@ -33,4 +35,16 @@ public interface TableVersionMapper {
       type = TableVersionSQLProviderFactory.class,
       method = "insertTableVersionOnDuplicateKeyUpdate")
   void insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO 
tablePO);
+
+  @UpdateProvider(
+      type = TableVersionSQLProviderFactory.class,
+      method = "softDeleteTableVersionByTableIdAndVersion")
+  void softDeleteTableVersionByTableIdAndVersion(
+      @Param("tableId") Long tableId, @Param("version") Long version);
+
+  @DeleteProvider(
+      type = TableVersionSQLProviderFactory.class,
+      method = "deleteTableVersionByLegacyTimeline")
+  Integer deleteTableVersionByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
index ab27353c00..4c518ef4bd 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
@@ -59,4 +59,14 @@ public class TableVersionSQLProviderFactory {
   public static String 
insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO tablePO) {
     return getProvider().insertTableVersionOnDuplicateKeyUpdate(tablePO);
   }
+
+  public static String softDeleteTableVersionByTableIdAndVersion(
+      @Param("tableId") Long tableId, @Param("version") Long version) {
+    return getProvider().softDeleteTableVersionByTableIdAndVersion(tableId, 
version);
+  }
+
+  public static String deleteTableVersionByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deleteTableVersionByLegacyTimeline(legacyTimeline, 
limit);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 8065476a61..f8f116c5b3 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -48,15 +48,23 @@ public class TableMetaBaseSQLProvider {
 
   public String listTablePOsByTableIds(List<Long> tableIds) {
     return "<script>"
-        + " SELECT table_id as tableId, table_name as tableName,"
-        + " metalake_id as metalakeId, catalog_id as catalogId,"
-        + " schema_id as schemaId, audit_info as auditInfo,"
-        + " current_version as currentVersion, last_version as lastVersion,"
-        + " deleted_at as deletedAt"
+        + "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+        + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+        + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+        + " tm.current_version as currentVersion, tm.last_version as 
lastVersion,"
+        + " tm.deleted_at as deletedAt,"
+        + " tv.format as format, "
+        + " tv.properties as properties,"
+        + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+        + " tv.distribution as distribution, tv.indexes as indexes,"
+        + " tv.comment as comment"
         + " FROM "
         + TABLE_NAME
-        + " WHERE deleted_at = 0"
-        + " AND table_id IN ("
+        + " tm LEFT JOIN "
+        + TableVersionMapper.TABLE_NAME
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version AND tv.deleted_at = 0"
+        + " WHERE tm.deleted_at = 0"
+        + " AND tm.table_id IN ("
         + "<foreach collection='tableIds' item='tableId' separator=','>"
         + "#{tableId}"
         + "</foreach>"
@@ -93,14 +101,23 @@ public class TableMetaBaseSQLProvider {
   }
 
   public String selectTableMetaById(@Param("tableId") Long tableId) {
-    return "SELECT table_id as tableId, table_name as tableName,"
-        + " metalake_id as metalakeId, catalog_id as catalogId,"
-        + " schema_id as schemaId, audit_info as auditInfo,"
-        + " current_version as currentVersion, last_version as lastVersion,"
-        + " deleted_at as deletedAt"
+    return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+        + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+        + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+        + " tm.current_version as currentVersion, tm.last_version as 
lastVersion,"
+        + " tm.deleted_at as deletedAt,"
+        + " tv.format as format, "
+        + " tv.properties as properties,"
+        + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+        + " tv.distribution as distribution, tv.indexes as indexes,"
+        + " tv.comment as comment"
         + " FROM "
         + TABLE_NAME
-        + " WHERE table_id = #{tableId} AND deleted_at = 0";
+        + " tm LEFT JOIN "
+        + TableVersionMapper.TABLE_NAME
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version"
+        + " AND tv.deleted_at = 0"
+        + " WHERE tm.table_id = #{tableId} AND tm.deleted_at = 0";
   }
 
   public String insertTableMeta(@Param("tableMeta") TablePO tablePO) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
index c39b8cbabb..68bd28a63d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -75,4 +75,20 @@ public class TableVersionBaseSQLProvider {
         + " version = #{tablePO.currentVersion},"
         + " deleted_at = #{tablePO.deletedAt}";
   }
+
+  public String softDeleteTableVersionByTableIdAndVersion(
+      @Param("tableId") Long tableId, @Param("version") Long version) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE table_id = #{tableId} AND version = #{version} AND 
deleted_at = 0";
+  }
+
+  public String deleteTableVersionByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
index 13eebeaa2c..11be90f345 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -45,7 +45,7 @@ public class TableVersionPostgreSQLProvider extends 
TableVersionBaseSQLProvider
         + " #{tablePO.currentVersion},"
         + " #{tablePO.deletedAt}"
         + " )"
-        + " ON CONFLICT (table_id, deleted_at) DO UPDATE SET"
+        + " ON CONFLICT (table_id, version, deleted_at) DO UPDATE SET"
         + " format = #{tablePO.format},"
         + " properties = #{tablePO.properties},"
         + " partitioning = #{tablePO.partitions},"
@@ -56,4 +56,13 @@ public class TableVersionPostgreSQLProvider extends 
TableVersionBaseSQLProvider
         + " version = #{tablePO.currentVersion},"
         + " deleted_at = #{tablePO.deletedAt}";
   }
+
+  @Override
+  public String softDeleteTableVersionByTableIdAndVersion(Long tableId, Long 
version) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = round(extract(epoch from(current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')) * 1000)"
+        + " WHERE table_id = #{tableId} AND version = #{version}";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 7a42d95db6..bfbbdaf39f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -136,12 +136,10 @@ public class TableMetaService {
               SessionUtils.doWithCommit(
                   TableVersionMapper.class,
                   mapper -> {
-                    if (po.getFormat() != null) {
-                      if (overwrite) {
-                        mapper.insertTableVersionOnDuplicateKeyUpdate(po);
-                      } else {
-                        mapper.insertTableVersion(po);
-                      }
+                    if (overwrite) {
+                      mapper.insertTableVersionOnDuplicateKeyUpdate(po);
+                    } else {
+                      mapper.insertTableVersion(po);
                     }
                   }),
           () -> {
@@ -196,11 +194,8 @@ public class TableMetaService {
                 .getParentEntityIdByNamespace(newTableEntity.namespace())
             : schemaId;
 
-    boolean isColumnChanged =
-        TableColumnMetaService.getInstance().isColumnUpdated(oldTableEntity, 
newTableEntity);
     TablePO newTablePO =
-        POConverters.updateTablePOWithVersionAndSchemaId(
-            oldTablePO, newTableEntity, isColumnChanged, newSchemaId);
+        POConverters.updateTablePOWithVersionAndSchemaId(oldTablePO, 
newTableEntity, newSchemaId);
 
     final AtomicInteger updateResult = new AtomicInteger(0);
     try {
@@ -214,12 +209,12 @@ public class TableMetaService {
               SessionUtils.doWithCommit(
                   TableVersionMapper.class,
                   mapper -> {
-                    if (newTablePO.getFormat() != null) {
-                      
mapper.insertTableVersionOnDuplicateKeyUpdate(newTablePO);
-                    }
+                    mapper.softDeleteTableVersionByTableIdAndVersion(
+                        oldTablePO.getTableId(), 
oldTablePO.getCurrentVersion());
+                    mapper.insertTableVersionOnDuplicateKeyUpdate(newTablePO);
                   }),
           () -> {
-            if (updateResult.get() > 0 && (isColumnChanged || 
isSchemaChanged)) {
+            if (updateResult.get() > 0) {
               TableColumnMetaService.getInstance()
                   .updateColumnPOsFromTableDiff(oldTableEntity, 
newTableEntity, newTablePO);
             }
@@ -250,7 +245,11 @@ public class TableMetaService {
     Long tableId = getTableIdBySchemaIdAndName(schemaId, tableName);
 
     AtomicInteger deleteResult = new AtomicInteger(0);
+    TablePO[] tablePOHolder = new TablePO[1];
     SessionUtils.doMultipleWithCommit(
+        () -> {
+          tablePOHolder[0] = getTablePOBySchemaIdAndName(schemaId, tableName);
+        },
         () ->
             deleteResult.set(
                 SessionUtils.getWithoutCommit(
@@ -284,6 +283,11 @@ public class TableMetaService {
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
                 mapper -> 
mapper.softDeletePolicyMetadataObjectRelsByTableId(tableId));
+            SessionUtils.doWithCommit(
+                TableVersionMapper.class,
+                mapper ->
+                    mapper.softDeleteTableVersionByTableIdAndVersion(
+                        tableId, tablePOHolder[0].getCurrentVersion()));
           }
         });
 
@@ -295,8 +299,18 @@ public class TableMetaService {
       baseMetricName = "deleteTableMetasByLegacyTimeline")
   public int deleteTableMetasByLegacyTimeline(Long legacyTimeline, int limit) {
     return SessionUtils.doWithCommitAndFetchResult(
-        TableMetaMapper.class,
-        mapper -> mapper.deleteTableMetasByLegacyTimeline(legacyTimeline, 
limit));
+            TableMetaMapper.class,
+            mapper -> mapper.deleteTableMetasByLegacyTimeline(legacyTimeline, 
limit))
+        + deleteTableVersionByLegacyTimeline(legacyTimeline, limit);
+  }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "deleteTableVersionByLegacyTimeline")
+  public int deleteTableVersionByLegacyTimeline(Long legacyTimeline, int 
limit) {
+    return SessionUtils.doWithCommitAndFetchResult(
+        TableVersionMapper.class,
+        mapper -> mapper.deleteTableVersionByLegacyTimeline(legacyTimeline, 
limit));
   }
 
   private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, 
Namespace namespace) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index fa8f06a9f8..48cf8cc00e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -45,7 +45,6 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
-import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
@@ -398,37 +397,19 @@ public class POConverters {
           
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.auditInfo()))
           .withCurrentVersion(INIT_VERSION)
           .withLastVersion(INIT_VERSION)
-          .withDeletedAt(DEFAULT_DELETED_AT);
-
-      if (tableEntity instanceof GenericTableEntity genericTable) {
-        builder.withFormat(genericTable.getFormat());
-        builder.withComment(genericTable.getComment());
-        builder.withProperties(
-            genericTable.getProperties() == null
-                ? null
-                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
-
-        // TODO store the following information to databases;
-        /**
-         * builder.withDistribution( genericTable.getDistribution() == null ? 
null :
-         * 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getDistribution()));
-         * builder.withPartitions( genericTable.getPartitions() == null ? null 
:
-         * 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getPartitions()));
-         */
-        builder.withIndexes(
-            genericTable.getIndexes() == null
-                ? null
-                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
-        builder.withProperties(
-            genericTable.getProperties() == null
-                ? null
-                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
-        builder.withSortOrders(
-            genericTable.getSortOrder() == null
-                ? null
-                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getSortOrder()));
-      }
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .withFormat(tableEntity.getFormat())
+          .withComment(tableEntity.getComment())
+          .withProperties(
+              tableEntity.getProperties() == null
+                  ? null
+                  : 
JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.getProperties()))
+          .withIndexes(
+              tableEntity.getIndexes() == null
+                  ? null
+                  : 
JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.getIndexes()));
 
+      // TODO, handle these fields(distribution, sort order, partition) later
       return builder.build();
     } catch (JsonProcessingException e) {
       throw new RuntimeException("Failed to serialize json object:", e);
@@ -440,21 +421,15 @@ public class POConverters {
    *
    * @param oldTablePO the old TablePO object
    * @param newTable the new TableEntity object
-   * @param needUpdateVersion whether need to update the version
    * @param newSchemaId the new schema id
    * @return TablePO object with updated version
    */
   public static TablePO updateTablePOWithVersionAndSchemaId(
-      TablePO oldTablePO, TableEntity newTable, boolean needUpdateVersion, 
Long newSchemaId) {
+      TablePO oldTablePO, TableEntity newTable, Long newSchemaId) {
     Long lastVersion;
     Long currentVersion;
-    if (needUpdateVersion) {
-      lastVersion = oldTablePO.getLastVersion() + 1;
-      currentVersion = lastVersion;
-    } else {
-      lastVersion = oldTablePO.getLastVersion();
-      currentVersion = oldTablePO.getCurrentVersion();
-    }
+    lastVersion = oldTablePO.getLastVersion() + 1;
+    currentVersion = lastVersion;
 
     try {
       TablePO.Builder builder =
@@ -467,23 +442,18 @@ public class POConverters {
               
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo()))
               .withCurrentVersion(currentVersion)
               .withLastVersion(lastVersion)
-              .withDeletedAt(DEFAULT_DELETED_AT);
-
-      // Note: GenericTableEntity will be removed in the refactor PR, so here 
just keep the old
-      // logic to make the UT pass.
-      if (newTable instanceof GenericTableEntity genericTable) {
-        builder.withFormat(genericTable.getFormat());
-        builder.withComment(genericTable.getComment());
-        builder.withProperties(
-            genericTable.getProperties() == null
-                ? null
-                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
-        builder.withIndexes(
-            genericTable.getIndexes() == null
-                ? null
-                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
-        // TODO other fields in the refactor PRs.
-      }
+              .withDeletedAt(DEFAULT_DELETED_AT)
+              .withComment(newTable.getComment())
+              .withProperties(
+                  newTable.getProperties() == null
+                      ? null
+                      : 
JsonUtils.anyFieldMapper().writeValueAsString(newTable.getProperties()))
+              .withIndexes(
+                  newTable.getIndexes() == null
+                      ? null
+                      : 
JsonUtils.anyFieldMapper().writeValueAsString(newTable.getIndexes()))
+              .withFormat(newTable.getFormat());
+      // TODO other fields(partitioning, distribution, sortorder) in the 
refactor PRs.
 
       return builder.build();
     } catch (JsonProcessingException e) {
@@ -505,29 +475,6 @@ public class POConverters {
   public static TableEntity fromTableAndColumnPOs(
       TablePO tablePO, List<ColumnPO> columnPOs, Namespace namespace) {
     try {
-      if (tablePO.getFormat() != null) {
-        return GenericTableEntity.getBuilder()
-            .withId(tablePO.getTableId())
-            .withName(tablePO.getTableName())
-            .withNamespace(namespace)
-            .withColumns(fromColumnPOs(columnPOs))
-            .withAuditInfo(
-                JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(), 
AuditInfo.class))
-            // TODO add field partition, distribution and sort order;
-            .withIndexes(
-                StringUtils.isBlank(tablePO.getIndexes())
-                    ? null
-                    : 
JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(), IndexImpl[].class))
-            .withFormat(tablePO.getFormat())
-            .withComment(tablePO.getComment())
-            .withProperties(
-                StringUtils.isBlank(tablePO.getProperties())
-                    ? null
-                    : 
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
-            .withColumns(fromColumnPOs(columnPOs))
-            .build();
-      }
-
       return TableEntity.builder()
           .withId(tablePO.getTableId())
           .withName(tablePO.getTableName())
@@ -535,6 +482,18 @@ public class POConverters {
           .withColumns(fromColumnPOs(columnPOs))
           .withAuditInfo(
               JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(), 
AuditInfo.class))
+          // TODO add field partition, distribution and sort order;
+          .withIndexes(
+              StringUtils.isBlank(tablePO.getIndexes())
+                  ? null
+                  : JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(), 
IndexImpl[].class))
+          .withFormat(tablePO.getFormat())
+          .withComment(tablePO.getComment())
+          .withProperties(
+              StringUtils.isBlank(tablePO.getProperties())
+                  ? null
+                  : 
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
+          .withColumns(fromColumnPOs(columnPOs))
           .build();
     } catch (JsonProcessingException e) {
       throw new RuntimeException("Failed to deserialize json object:", e);
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java 
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
index 5b83a0e22b..73f8a146af 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
@@ -98,6 +98,7 @@ public abstract class TestOperationDispatcher {
     entityStore.put(metalakeEntity, true);
 
     catalogManager = new CatalogManager(config, entityStore, idGenerator);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
 
     Config config = mock(Config.class);
     doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
index c67dae6dd1..92c0e16131 100644
--- a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
@@ -52,6 +52,7 @@ import org.apache.gravitino.catalog.TestOperationDispatcher;
 import org.apache.gravitino.catalog.TestTableOperationDispatcher;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
@@ -95,7 +96,13 @@ public class TestTableHookDispatcher extends 
TestOperationDispatcher {
     catalogManager = Mockito.mock(CatalogManager.class);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
     BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalog.capability()).thenReturn(Capability.DEFAULT);
+    CatalogManager.CatalogWrapper catalogWrapper =
+        Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(catalogWrapper.catalog()).thenReturn(catalog);
+
     Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
     authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
     
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
   }
diff --git a/core/src/test/java/org/apache/gravitino/meta/TestEntity.java 
b/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
index 8bff5ce966..9ac22212be 100644
--- a/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
+++ b/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
@@ -31,6 +31,14 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -152,13 +160,38 @@ public class TestEntity {
 
   @Test
   public void testTable() {
+    String format = "parquet";
+    String comment = "test table comment";
+    Map<String, String> tableProperties = ImmutableMap.of("tableKey1", 
"tableValue1");
+    SortOrder[] sortOrders =
+        new SortOrder[] {SortOrders.of(NamedReference.field("col1"), 
SortDirection.ASCENDING)};
+    Index[] indexes =
+        new Index[] {Indexes.of(Index.IndexType.BTREE, "idx1", new String[][] 
{{"col1"}})};
+    Distribution distribution = Distributions.hash(4, 
NamedReference.field("col1"));
+
     TableEntity testTable =
-        
TableEntity.builder().withId(tableId).withName(tableName).withAuditInfo(auditInfo).build();
+        TableEntity.builder()
+            .withId(tableId)
+            .withName(tableName)
+            .withAuditInfo(auditInfo)
+            .withFormat(format)
+            .withSortOrder(sortOrders)
+            .withProperties(tableProperties)
+            .withComment(comment)
+            .withIndexes(indexes)
+            .withDistribution(distribution)
+            .build();
 
     Map<Field, Object> fields = testTable.fields();
     Assertions.assertEquals(tableId, fields.get(TableEntity.ID));
     Assertions.assertEquals(tableName, fields.get(TableEntity.NAME));
     Assertions.assertEquals(auditInfo, fields.get(TableEntity.AUDIT_INFO));
+    Assertions.assertEquals(format, fields.get(TableEntity.FORMAT));
+    Assertions.assertEquals(tableProperties, 
fields.get(TableEntity.PROPERTIES));
+    Assertions.assertEquals(comment, fields.get(TableEntity.COMMENT));
+    Assertions.assertEquals(sortOrders, fields.get(TableEntity.SORT_ORDER));
+    Assertions.assertEquals(indexes, fields.get(TableEntity.INDEXES));
+    Assertions.assertEquals(distribution, 
fields.get(TableEntity.DISTRIBUTION));
   }
 
   @Test
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index ce9f0f5c73..ad9cd173bc 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -79,7 +79,6 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
-import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
@@ -2686,8 +2685,8 @@ public class TestEntityStorage {
       store.put(schemaEntity, false);
 
       long column1Id = RandomIdGenerator.INSTANCE.nextId();
-      GenericTableEntity table =
-          GenericTableEntity.getBuilder()
+      TableEntity table =
+          TableEntity.builder()
               .withId(RandomIdGenerator.INSTANCE.nextId())
               .withNamespace(NamespaceUtil.ofTable("metalake", "catalog", 
"schema"))
               .withName("table")
@@ -2707,8 +2706,8 @@ public class TestEntityStorage {
               .withProperties(ImmutableMap.of("location", "/tmp/test", 
"format", "lance"))
               .build();
       store.put(table, false);
-      GenericTableEntity fetchedTable =
-          store.get(table.nameIdentifier(), Entity.EntityType.TABLE, 
GenericTableEntity.class);
+      TableEntity fetchedTable =
+          store.get(table.nameIdentifier(), Entity.EntityType.TABLE, 
TableEntity.class);
 
       // check table properties
       Assertions.assertEquals("/tmp/test", 
fetchedTable.getProperties().get("location"));
@@ -2718,8 +2717,8 @@ public class TestEntityStorage {
       Assertions.assertEquals("column1", fetchedTable.columns().get(0).name());
 
       // Now try to update the table
-      GenericTableEntity updatedTable =
-          GenericTableEntity.getBuilder()
+      TableEntity updatedTable =
+          TableEntity.builder()
               .withId(table.id())
               .withNamespace(table.namespace())
               .withName(table.name())
@@ -2748,12 +2747,9 @@ public class TestEntityStorage {
               .build();
 
       store.update(
-          table.nameIdentifier(),
-          GenericTableEntity.class,
-          Entity.EntityType.TABLE,
-          e -> updatedTable);
-      GenericTableEntity fetchedUpdatedTable =
-          store.get(table.nameIdentifier(), Entity.EntityType.TABLE, 
GenericTableEntity.class);
+          table.nameIdentifier(), TableEntity.class, Entity.EntityType.TABLE, 
e -> updatedTable);
+      TableEntity fetchedUpdatedTable =
+          store.get(table.nameIdentifier(), Entity.EntityType.TABLE, 
TableEntity.class);
 
       // check updated table properties
       Assertions.assertEquals(
@@ -2772,6 +2768,11 @@ public class TestEntityStorage {
               .filter(c -> c.name().equals("column2"))
               .findFirst()
               .isPresent());
+
+      // Test drop the table
+      Assertions.assertTrue(store.delete(table.nameIdentifier(), 
Entity.EntityType.TABLE));
+      Assertions.assertFalse(store.exists(table.nameIdentifier(), 
Entity.EntityType.TABLE));
+
       destroy(type);
     } catch (IOException e) {
       throw new RuntimeException(e);
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index a552598709..b2916cfcaf 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -61,6 +61,7 @@ import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -1195,6 +1196,128 @@ public class TestJDBCBackend {
     assertEquals(1, countActiveOwnerRel(user.id()));
   }
 
+  @Test
+  void testUpdateAndDropLanceTable() throws IOException, InterruptedException {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+    String metalakeName = "metalake" + RandomIdGenerator.INSTANCE.nextId();
+    BaseMetalake metalake =
+        BaseMetalake.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName(metalakeName)
+            .withAuditInfo(auditInfo)
+            .withComment(null)
+            .withProperties(null)
+            .withVersion(SchemaVersion.V_0_1)
+            .build();
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        CatalogEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withNamespace(NamespaceUtil.ofCatalog(metalakeName))
+            .withName("catalog")
+            .withAuditInfo(auditInfo)
+            .withComment(null)
+            .withProperties(null)
+            .withType(Catalog.Type.RELATIONAL)
+            .withProvider("generic-lakehouse")
+            .build();
+
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withNamespace(NamespaceUtil.ofSchema(metalakeName, 
catalog.name()))
+            .withName("schema")
+            .withAuditInfo(auditInfo)
+            .withComment(null)
+            .withProperties(null)
+            .build();
+
+    backend.insert(schema, false);
+
+    TableEntity table =
+        TableEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withNamespace(NamespaceUtil.ofTable(metalakeName, catalog.name(), 
schema.name()))
+            .withName("table")
+            .withAuditInfo(auditInfo)
+            .withFormat("lance")
+            .withComment(null)
+            .withProperties(ImmutableMap.of("format", "LANCE", "location", 
"/tmp/test/lance"))
+            .build();
+
+    backend.insert(table, false);
+
+    TableEntity fetchedTable = backend.get(table.nameIdentifier(), 
Entity.EntityType.TABLE);
+    Assertions.assertEquals("LANCE", 
fetchedTable.getProperties().get("format"));
+
+    TableEntity updatedTable =
+        TableEntity.builder()
+            .withId(table.id())
+            .withNamespace(table.namespace())
+            .withName(table.name())
+            .withAuditInfo(auditInfo)
+            .withComment("update comment")
+            .withProperties(ImmutableMap.of("format", "LANCE", "location", 
"/tmp/test/lance"))
+            .build();
+
+    backend.update(table.nameIdentifier(), EntityType.TABLE, e -> 
updatedTable);
+    Thread.sleep(1000);
+    long now = System.currentTimeMillis();
+
+    // For update lance table, we will do the following things
+    // 1. Update info in table table_meta;
+    // 2. Soft drop info in the table table_version_info with an old version.
+    // 3. Insert new version info in the table table_version_info.
+    // Let us check it.
+    int tableInfoCount =
+        countTableVersionInfoRow(
+            String.format(
+                "SELECT count(*) FROM table_version_info WHERE table_id = %d 
AND deleted_at = 0",
+                table.id()));
+    Assertions.assertEquals(1, tableInfoCount);
+    int tableInfoDeletedCount =
+        countTableVersionInfoRow(
+            String.format(
+                "SELECT count(*) FROM table_version_info WHERE table_id = %d 
AND deleted_at != 0",
+                table.id()));
+    Assertions.assertEquals(1, tableInfoDeletedCount);
+
+    // Now try to clean up old version data
+    int deletedCount = backend.hardDeleteLegacyData(Entity.EntityType.TABLE, 
now);
+    // Only one old version data should be cleaned up in table version info 
table.
+    Assertions.assertEquals(1, deletedCount);
+
+    // Try to drop the table
+    backend.delete(table.nameIdentifier(), Entity.EntityType.TABLE, false);
+
+    Thread.sleep(1000);
+    now = System.currentTimeMillis();
+    // After drop, there should be one more deleted record in 
table_version_info
+    int deleteCountNow = backend.hardDeleteLegacyData(Entity.EntityType.TABLE, 
now);
+    // One row from table_meta and one row from table_version_info should be 
cleaned up.
+    Assertions.assertEquals(2, deleteCountNow);
+  }
+
+  private int countTableVersionInfoRow(String sql) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet rs = statement.executeQuery(sql)) {
+      while (rs.next()) {
+        return rs.getInt(1);
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException("SQL execution failed", e);
+    }
+    return 0;
+  }
+
   private boolean legacyRecordExistsInDB(Long id, Entity.EntityType 
entityType) {
     String tableName;
     String idColumnName;
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index a2d8a259da..db9d0cce86 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -694,8 +694,7 @@ public class TestPOConverters {
     TablePO.Builder builder =
         
TablePO.builder().withMetalakeId(1L).withCatalogId(1L).withSchemaId(1L);
     TablePO initPO = POConverters.initializeTablePOWithVersion(tableEntity, 
builder);
-    TablePO updatePO =
-        POConverters.updateTablePOWithVersionAndSchemaId(initPO, updatedTable, 
false, 1L);
+    TablePO updatePO = 
POConverters.updateTablePOWithVersionAndSchemaId(initPO, updatedTable, 1L);
     assertEquals(1, initPO.getCurrentVersion());
     assertEquals(1, initPO.getLastVersion());
     assertEquals(0, initPO.getDeletedAt());
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
index 493d0acace..4c3b96ce9a 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
@@ -163,6 +163,8 @@ public class LanceNamespaceOperations {
 
   @GET
   @Path("{id}/table/list")
+  @Timed(name = "list-tables." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
+  @ResponseMetered(name = "list-tables", absolute = true)
   public Response listTables(
       @PathParam("id") String namespaceId,
       @DefaultValue("$") @QueryParam("delimiter") String delimiter,
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 359fc94c42..dd10a31534 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.lance.service.rest;
 
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.lancedb.lance.namespace.model.CreateTableResponse;
 import com.lancedb.lance.namespace.model.DescribeTableResponse;
@@ -37,6 +39,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
 import org.apache.gravitino.lance.service.LanceExceptionMapper;
+import org.apache.gravitino.metrics.MetricNames;
 
 @Path("/v1/table/{id}")
 @Consumes(MediaType.APPLICATION_JSON)
@@ -52,6 +55,8 @@ public class LanceTableOperations {
 
   @POST
   @Path("/describe")
+  @Timed(name = "describe-table." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "describe-table", absolute = true)
   public Response describeTable(
       @PathParam("id") String tableId,
       @DefaultValue("$") @QueryParam("delimiter") String delimiter) {
@@ -68,6 +73,8 @@ public class LanceTableOperations {
   @Path("/create")
   @Consumes("application/vnd.apache.arrow.stream")
   @Produces("application/json")
+  @Timed(name = "create-table." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
+  @ResponseMetered(name = "create-table", absolute = true)
   public Response createTable(
       @PathParam("id") String tableId,
       @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
@@ -92,6 +99,8 @@ public class LanceTableOperations {
 
   @POST
   @Path("/create-empty")
+  @Timed(name = "create-empty-table." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "create-empty-table", absolute = true)
   public Response createEmptyTable(
       @PathParam("id") String tableId,
       @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
diff --git a/scripts/h2/schema-1.1.0-h2.sql b/scripts/h2/schema-1.1.0-h2.sql
index 6172915f1f..2a8db7fa07 100644
--- a/scripts/h2/schema-1.1.0-h2.sql
+++ b/scripts/h2/schema-1.1.0-h2.sql
@@ -432,7 +432,7 @@ CREATE TABLE IF NOT EXISTS `job_run_meta` (
 
 CREATE TABLE IF NOT EXISTS `table_version_info` (
     `table_id`        BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
-    `format`          VARCHAR(64) NOT NULL COMMENT 'table format, such as 
Lance, Iceberg and so on',
+    `format`          VARCHAR(64) COMMENT 'table format, such as Lance, 
Iceberg and so on, it will be null if it is not a lakehouse table' ,
     `properties`      CLOB DEFAULT NULL COMMENT 'table properties',
     `partitioning`  CLOB DEFAULT NULL COMMENT 'table partition info',
     `distribution` CLOB DEFAULT NULL COMMENT 'table distribution info',
@@ -441,6 +441,5 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
     `comment`   CLOB DEFAULT NULL COMMENT 'table comment',
     `version` BIGINT(20) UNSIGNED COMMENT 'table current version',
     `deleted_at`      BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion 
timestamp, 0 means not deleted',
-    PRIMARY KEY (table_id),
-    UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `deleted_at`)
+    UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`, 
`deleted_at`)
 ) ENGINE=InnoDB COMMENT 'table detail information including format, location, 
properties, partition, distribution, sort order, index and so on';
\ No newline at end of file
diff --git a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql 
b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
index cf42a02b57..2274b64ab0 100644
--- a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
+++ b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
@@ -19,7 +19,7 @@
 
 CREATE TABLE IF NOT EXISTS `table_version_info` (
     `table_id`        BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
-    `format`          VARCHAR(64) NOT NULL COMMENT 'table format, such as 
Lance, Iceberg and so on',
+    `format`          VARCHAR(64) COMMENT 'table format, such as Lance, 
Iceberg and so on, it will be null if it is not a lakehouse table',
     `properties`      CLOB DEFAULT NULL COMMENT 'table properties',
     `partitioning`  MEDIUMTEXT DEFAULT NULL COMMENT 'table partition info',
     `distribution` MEDIUMTEXT DEFAULT NULL COMMENT 'table distribution info',
@@ -28,6 +28,5 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
     `comment`   MEDIUMTEXT DEFAULT NULL COMMENT 'table comment',
     `version` BIGINT(20) UNSIGNED COMMENT 'table current version',
     `deleted_at`      BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion 
timestamp, 0 means not deleted',
-    PRIMARY KEY (table_id),
-    UNIQUE KEY `uk_table_id_deleted_at` (`table_id`, `deleted_at`)
+    UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`, 
`deleted_at`)
 ) ENGINE=InnoDB COMMENT 'table detail information including format, location, 
properties, partition, distribution, sort order, index and so on';
\ No newline at end of file
diff --git a/scripts/mysql/schema-1.1.0-mysql.sql 
b/scripts/mysql/schema-1.1.0-mysql.sql
index ca9b351b03..5659d71ea2 100644
--- a/scripts/mysql/schema-1.1.0-mysql.sql
+++ b/scripts/mysql/schema-1.1.0-mysql.sql
@@ -423,7 +423,7 @@ CREATE TABLE IF NOT EXISTS `job_run_meta` (
 
 CREATE TABLE IF NOT EXISTS `table_version_info` (
     `table_id`        BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
-    `format`          VARCHAR(64) NOT NULL COMMENT 'table format, such as 
Lance, Iceberg and so on',
+    `format`          VARCHAR(64) COMMENT 'table format, such as Lance, 
Iceberg and so on, it will be null if it is not a lakehouse table',
     `properties`      MEDIUMTEXT DEFAULT NULL COMMENT 'table properties',
     `partitioning`  MEDIUMTEXT DEFAULT NULL COMMENT 'table partition info',
     `distribution` MEDIUMTEXT DEFAULT NULL COMMENT 'table distribution info',
@@ -432,7 +432,6 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
     `comment`   MEDIUMTEXT DEFAULT NULL COMMENT 'table comment',
     `version` BIGINT(20) UNSIGNED COMMENT 'table current version',
     `deleted_at`      BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion 
timestamp, 0 means not deleted',
-     PRIMARY KEY (table_id),
-     UNIQUE KEY `uk_table_id_deleted_at` (`table_id`, `deleted_at`)
+    UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`, 
`deleted_at`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table 
detail information including format, location, properties, partition, 
distribution, sort order, index and so on';
 
diff --git a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql 
b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
index 6663150f15..a883bafcd3 100644
--- a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
+++ b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
@@ -19,7 +19,7 @@
 
 CREATE TABLE IF NOT EXISTS `table_version_info` (
     `table_id`        BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
-    `format`          VARCHAR(64) NOT NULL COMMENT 'table format, such as 
Lance, Iceberg and so on',
+    `format`          VARCHAR(64) COMMENT 'table format, such as Lance, 
Iceberg and so on, it will be null if it is not a lakehouse table',
     `properties`      MEDIUMTEXT DEFAULT NULL COMMENT 'table properties',
     `partitioning`  MEDIUMTEXT DEFAULT NULL COMMENT 'table partition info',
     `distribution` MEDIUMTEXT DEFAULT NULL COMMENT 'table distribution info',
@@ -28,6 +28,5 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
     `comment`   MEDIUMTEXT DEFAULT NULL COMMENT 'table comment',
     `version` BIGINT(20) UNSIGNED COMMENT 'table current version',
     `deleted_at`      BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion 
timestamp, 0 means not deleted',
-    PRIMARY KEY (table_id),
-    UNIQUE KEY `uk_table_id_deleted_at` (`table_id`, `deleted_at`)
+    UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`, 
`deleted_at`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table 
detail information including format, location, properties, partition, 
distribution, sort order, index and so on';
\ No newline at end of file
diff --git a/scripts/postgresql/schema-1.1.0-postgresql.sql 
b/scripts/postgresql/schema-1.1.0-postgresql.sql
index c5bc6b3205..2f3bf16fab 100644
--- a/scripts/postgresql/schema-1.1.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.1.0-postgresql.sql
@@ -750,8 +750,8 @@ COMMENT ON COLUMN job_run_meta.last_version IS 'job run 
last version';
 COMMENT ON COLUMN job_run_meta.deleted_at IS 'job run deleted at';
 
 CREATE TABLE IF NOT EXISTS table_version_info (
-    table_id        BIGINT PRIMARY KEY,
-    format          VARCHAR(64) NOT NULL,
+    table_id        BIGINT NOT NULL,
+    format          VARCHAR(64),
     properties      TEXT,
     partitioning  TEXT,
     distribution TEXT,
@@ -760,11 +760,11 @@ CREATE TABLE IF NOT EXISTS table_version_info (
     "comment"   TEXT,
     version BIGINT,
     deleted_at      BIGINT DEFAULT 0,
-    UNIQUE (table_id, deleted_at)
+    UNIQUE (table_id, version, deleted_at)
 );
 COMMENT ON TABLE table_version_info                  IS 'table detail 
information including format, location, properties, partition, distribution, 
sort order, index and so on';
 COMMENT ON COLUMN table_version_info.table_id        IS 'table id';
-COMMENT ON COLUMN table_version_info.format          IS 'table format, such as 
Lance, Iceberg and so on';
+COMMENT ON COLUMN table_version_info.format          IS 'table format, such as 
Lance, Iceberg and so on, it will be null if it is not a lakehouse table';
 COMMENT ON COLUMN table_version_info.properties      IS 'table properties';
 COMMENT ON COLUMN table_version_info.partitioning      IS 'table partition 
info';
 COMMENT on COLUMN table_version_info.distribution    IS 'table distribution 
info';
diff --git a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql 
b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
index 42d06e30a8..80c279af57 100644
--- a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
@@ -19,8 +19,8 @@
 
 
 CREATE TABLE IF NOT EXISTS table_version_info (
-    table_id        BIGINT PRIMARY KEY,
-    format          VARCHAR(64) NOT NULL,
+    table_id        BIGINT NOT NULL,
+    format          VARCHAR(64),
     properties      TEXT,
     partitions  TEXT,
     distribution TEXT,
@@ -29,11 +29,11 @@ CREATE TABLE IF NOT EXISTS table_version_info (
     "comment"   TEXT,
     version BIGINT,
     deleted_at      BIGINT DEFAULT 0,
-    UNIQUE (table_id, deleted_at)
+    UNIQUE (table_id, version, deleted_at)
 );
 COMMENT ON TABLE table_version_info                  IS 'table detail 
information including format, location, properties, partition, distribution, 
sort order, index and so on';
 COMMENT ON COLUMN table_version_info.table_id        IS 'table id';
-COMMENT ON COLUMN table_version_info.format          IS 'table format, such as 
Lance, Iceberg and so on';
+COMMENT ON COLUMN table_version_info.format          IS 'table format, such as 
Lance, Iceberg and so on, it will be null if it is not a lakehouse table';
 COMMENT ON COLUMN table_version_info.properties      IS 'table properties';
 COMMENT ON COLUMN table_version_info.partitions      IS 'table partition info';
 COMMENT on COLUMN table_version_info.distribution    IS 'table distribution 
info';

Reply via email to