This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new c206c0748a [#9074] improve(catalog-lakehouse-generic): Refactor the 
code structure of the generic lakehouse catalog (#9160)
c206c0748a is described below

commit c206c0748a9e6b0a96b9eb68e593ef87df3d8e23
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Nov 26 11:22:05 2025 +0800

    [#9074] improve(catalog-lakehouse-generic): Refactor the code structure of 
the generic lakehouse catalog (#9160)
    
    ### What changes were proposed in this pull request?
    
    Refactor the code of lakehouse-generic catalog.
    
    ### Why are the changes needed?
    
    Fix: #9074
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs.
---
 .../main/java/org/apache/gravitino/Catalog.java    |  14 +
 api/src/main/java/org/apache/gravitino/Schema.java |  13 +
 build.gradle.kts                                   |   2 +-
 .../GenericLakehouseCatalogOperations.java         | 487 ---------------------
 .../GenericLakehouseTablePropertiesMetadata.java   |  81 ----
 .../lakehouse/LakehouseCatalogOperations.java      |  39 --
 .../catalog/lakehouse/LakehouseTableFormat.java    |  41 --
 .../lakehouse/lance/LanceCatalogOperations.java    | 299 -------------
 .../catalog/lakehouse/utils/EntityConverter.java   |  42 --
 .../lakehouse/utils/TestEntityConverter.java       |  78 ----
 .../build.gradle.kts                               |   7 +-
 .../catalog/lakehouse/generic/GenericCatalog.java} |  27 +-
 .../generic/GenericCatalogCapability.java}         |  13 +-
 .../generic/GenericCatalogOperations.java          | 363 +++++++++++++++
 .../generic/GenericCatalogPropertiesMetadata.java} |  21 +-
 .../generic/GenericSchemaPropertiesMetadata.java}  |  21 +-
 .../generic/GenericTablePropertiesMetadata.java}   |  39 +-
 .../lakehouse/generic/LakehouseTableDelegator.java |  60 +++
 .../generic/LakehouseTableDelegatorFactory.java    |  65 +++
 .../lakehouse/lance/LanceTableDelegator.java       |  68 +++
 .../lakehouse/lance/LanceTableOperations.java      | 282 ++++++++++++
 .../services/org.apache.gravitino.CatalogProvider  |   2 +-
 ...alog.lakehouse.generic.LakehouseTableDelegator} |   2 +-
 .../src/main/resources/lakehouse-generic.conf}     |   0
 .../generic/TestGenericCatalogOperations.java}     |   8 +-
 .../lakehouse/generic}/TestPropertiesMetadata.java |  52 +--
 .../test/CatalogGenericCatalogLanceIT.java}        |  14 +-
 .../catalog/TableOperationDispatcher.java          | 123 ++----
 .../connector/GenericLakehouseColumn.java          |  57 ---
 .../gravitino/connector/GenericLakehouseTable.java |  69 ---
 lance/lance-common/build.gradle.kts                |  10 +
 .../GravitinoLanceNameSpaceOperations.java         |   2 +-
 .../gravitino/GravitinoLanceNamespaceWrapper.java  |   2 +-
 .../gravitino/GravitinoLanceTableOperations.java   |   3 +-
 lance/lance-rest-server/build.gradle.kts           |  12 +-
 .../lance/integration/test/LanceRESTServiceIT.java |  14 +-
 settings.gradle.kts                                |   2 +-
 37 files changed, 1027 insertions(+), 1407 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/Catalog.java 
b/api/src/main/java/org/apache/gravitino/Catalog.java
index 9fbf9f3ac5..680c4c6b1a 100644
--- a/api/src/main/java/org/apache/gravitino/Catalog.java
+++ b/api/src/main/java/org/apache/gravitino/Catalog.java
@@ -132,6 +132,20 @@ public interface Catalog extends Auditable {
   /** The property indicates the catalog is in use. */
   String PROPERTY_IN_USE = "in-use";
 
+  /**
+   * The property name for the catalog location. This property indicates the 
physical location of
+   * the catalog's data, such as a file path or a URI.
+   *
+   * <p>The location property is optional, it can be specified when creating 
the catalog.
+   *
+   * <p>It depends on the catalog implementation to decide whether to leverage 
this property. It
+   * also depends on the catalog implementation to decide whether to allow 
altering this property
+   * after catalog creation.
+   *
+   * <p>The behavior of altering this property (moving the catalog data) is 
also catalog specific.
+   */
+  String PROPERTY_LOCATION = "location";
+
   /**
    * The property to specify the cloud that the catalog is running on. The 
value should be one of
    * the {@link CloudName}.
diff --git a/api/src/main/java/org/apache/gravitino/Schema.java 
b/api/src/main/java/org/apache/gravitino/Schema.java
index 292ed59e52..c42d760011 100644
--- a/api/src/main/java/org/apache/gravitino/Schema.java
+++ b/api/src/main/java/org/apache/gravitino/Schema.java
@@ -37,6 +37,19 @@ import org.apache.gravitino.tag.SupportsTags;
 @Evolving
 public interface Schema extends Auditable {
 
+  /**
+   * The property name for the location of the schema. This property indicates 
the physical storage
+   * location of the schema, such as a directory path in a file system or a 
URI.
+   *
+   * <p>This property is optional, and it can be specified during schema 
creation.
+   *
+   * <p>It depends on the catalog implementation to decide whether to use this 
property or not. It
+   * also depends on the catalog implementation to decide whether this 
property can be altered after
+   * schema creation. Besides, the behavior of altering this property (moving 
data or not) is also
+   * determined by the catalog implementation.
+   */
+  String PROPERTY_LOCATION = "location";
+
   /**
    * @return The name of the Schema.
    */
diff --git a/build.gradle.kts b/build.gradle.kts
index 4ad385c5cb..d805b00002 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -952,7 +952,7 @@ tasks {
       ":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
       ":catalogs:catalog-lakehouse-paimon:copyLibAndConfig",
       ":catalogs:catalog-model:copyLibAndConfig",
-      ":catalogs:catalog-generic-lakehouse:copyLibAndConfig"
+      ":catalogs:catalog-lakehouse-generic:copyLibAndConfig"
     )
   }
 
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
deleted file mode 100644
index d65107fe7e..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog.lakehouse;
-
-import static org.apache.gravitino.Entity.EntityType.TABLE;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.time.Instant;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.Catalog;
-import org.apache.gravitino.Entity;
-import org.apache.gravitino.EntityAlreadyExistsException;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import org.apache.gravitino.Schema;
-import org.apache.gravitino.SchemaChange;
-import org.apache.gravitino.catalog.ManagedSchemaOperations;
-import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
-import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
-import org.apache.gravitino.connector.CatalogInfo;
-import org.apache.gravitino.connector.CatalogOperations;
-import org.apache.gravitino.connector.GenericLakehouseTable;
-import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.connector.SupportsSchemas;
-import org.apache.gravitino.exceptions.NoSuchCatalogException;
-import org.apache.gravitino.exceptions.NoSuchEntityException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NoSuchTableException;
-import org.apache.gravitino.exceptions.NonEmptySchemaException;
-import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
-import org.apache.gravitino.exceptions.TableAlreadyExistsException;
-import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.meta.SchemaEntity;
-import org.apache.gravitino.meta.TableEntity;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
-import org.apache.gravitino.rel.TableCatalog;
-import org.apache.gravitino.rel.TableChange;
-import org.apache.gravitino.rel.expressions.distributions.Distribution;
-import org.apache.gravitino.rel.expressions.sorts.SortOrder;
-import org.apache.gravitino.rel.expressions.transforms.Transform;
-import org.apache.gravitino.rel.indexes.Index;
-import org.apache.gravitino.storage.IdGenerator;
-import org.apache.gravitino.utils.PrincipalUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Operations for interacting with a generic lakehouse catalog in Apache 
Gravitino.
- *
- * <p>This catalog provides a unified interface for managing lakehouse table 
formats. It is designed
- * to be extensible and can support various table formats through a common 
interface.
- */
-public class GenericLakehouseCatalogOperations
-    implements CatalogOperations, SupportsSchemas, TableCatalog {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(GenericLakehouseCatalogOperations.class);
-
-  private static final String SLASH = "/";
-
-  private final ManagedSchemaOperations managedSchemaOps;
-
-  private Optional<Path> catalogLakehouseLocation;
-
-  private static final Map<LakehouseTableFormat, LakehouseCatalogOperations> 
SUPPORTED_FORMATS =
-      Maps.newConcurrentMap();
-
-  private Map<String, String> catalogConfig;
-  private CatalogInfo catalogInfo;
-  private HasPropertyMetadata propertiesMetadata;
-  private EntityStore store;
-  /**
-   * Initializes the generic lakehouse catalog operations with the provided 
configuration.
-   *
-   * @param conf The configuration map for the generic catalog operations.
-   * @param info The catalog info associated with this operation instance.
-   * @param propertiesMetadata The properties metadata of generic lakehouse 
catalog.
-   * @throws RuntimeException if initialization fails.
-   */
-  @Override
-  public void initialize(
-      Map<String, String> conf, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
-      throws RuntimeException {
-    String catalogLocation =
-        (String)
-            propertiesMetadata
-                .catalogPropertiesMetadata()
-                .getOrDefault(conf, 
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
-    this.catalogLakehouseLocation =
-        StringUtils.isNotBlank(catalogLocation)
-            ? 
Optional.of(catalogLocation).map(this::ensureTrailingSlash).map(Path::new)
-            : Optional.empty();
-    this.store = GravitinoEnv.getInstance().entityStore();
-    this.catalogConfig = conf;
-    this.catalogInfo = info;
-    this.propertiesMetadata = propertiesMetadata;
-  }
-
-  public GenericLakehouseCatalogOperations() {
-    this(GravitinoEnv.getInstance().entityStore());
-  }
-
-  @VisibleForTesting
-  GenericLakehouseCatalogOperations(EntityStore store) {
-    this.managedSchemaOps =
-        new ManagedSchemaOperations() {
-          @Override
-          protected EntityStore store() {
-            return store;
-          }
-        };
-  }
-
-  @Override
-  public void close() {}
-
-  @Override
-  public void testConnection(
-      NameIdentifier catalogIdent,
-      Catalog.Type type,
-      String provider,
-      String comment,
-      Map<String, String> properties) {
-    // No-op for generic lakehouse catalog.
-  }
-
-  @Override
-  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
-    return managedSchemaOps.listSchemas(namespace);
-  }
-
-  @Override
-  public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
-      throws NoSuchCatalogException, SchemaAlreadyExistsException {
-    return managedSchemaOps.createSchema(ident, comment, properties);
-  }
-
-  @Override
-  public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
-    return managedSchemaOps.loadSchema(ident);
-  }
-
-  @Override
-  public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
-      throws NoSuchSchemaException {
-    return managedSchemaOps.alterSchema(ident, changes);
-  }
-
-  @Override
-  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
-    return managedSchemaOps.dropSchema(ident, cascade);
-  }
-
-  // ==================== Table Operations (In Development) 
====================
-  // TODO: Implement table operations in subsequent releases
-  // See: https://github.com/apache/gravitino/issues/8838
-  // The table operations will delegate to format-specific implementations
-  // (e.g., LanceCatalogOperations for Lance tables)
-
-  @Override
-  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    NameIdentifier identifier = NameIdentifier.of(namespace.levels());
-    try {
-      store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
-    } catch (NoSuchEntityException e) {
-      throw new NoSuchSchemaException(e, "Schema %s does not exist", 
namespace);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Failed to get schema " + identifier);
-    }
-
-    try {
-      List<TableEntity> tableEntityList = store.list(namespace, 
TableEntity.class, TABLE);
-      return tableEntityList.stream()
-          .map(e -> NameIdentifier.of(namespace, e.name()))
-          .toArray(NameIdentifier[]::new);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
-    }
-  }
-
-  @Override
-  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
-    try {
-      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
-      return GenericLakehouseTable.builder()
-          .withProperties(tableEntity.properties())
-          .withAuditInfo(tableEntity.auditInfo())
-          .withSortOrders(tableEntity.sortOrders())
-          .withPartitioning(tableEntity.partitioning())
-          .withDistribution(tableEntity.distribution())
-          .withColumns(EntityConverter.toColumns(tableEntity.columns()))
-          .withIndexes(tableEntity.indexes())
-          .withName(tableEntity.name())
-          .withComment(tableEntity.comment())
-          .build();
-    } catch (NoSuchEntityException e) {
-      throw new NoSuchTableException(e, "Table %s does not exist", ident);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to list tables under schema " + 
ident.namespace(), e);
-    }
-  }
-
-  @Override
-  public Table createTable(
-      NameIdentifier ident,
-      Column[] columns,
-      String comment,
-      Map<String, String> properties,
-      Transform[] partitions,
-      Distribution distribution,
-      SortOrder[] sortOrders,
-      Index[] indexes)
-      throws NoSuchSchemaException, TableAlreadyExistsException {
-    Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
-
-    String tableLocation = calculateTableLocation(schema, ident, properties);
-    Map<String, String> tableStorageProps = calculateTableStorageProps(schema, 
properties);
-
-    Map<String, String> newProperties = Maps.newHashMap(properties);
-    
newProperties.put(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION, 
tableLocation);
-    newProperties.putAll(tableStorageProps);
-
-    AuditInfo auditInfo =
-        AuditInfo.builder()
-            .withCreator(PrincipalUtils.getCurrentUserName())
-            .withCreateTime(Instant.now())
-            .build();
-    IdGenerator idGenerator = GravitinoEnv.getInstance().idGenerator();
-    List<ColumnEntity> columnEntityList =
-        IntStream.range(0, columns.length)
-            .mapToObj(
-                i -> ColumnEntity.toColumnEntity(columns[i], i, 
idGenerator.nextId(), auditInfo))
-            .collect(Collectors.toList());
-
-    try {
-      TableEntity entityToStore =
-          TableEntity.builder()
-              .withName(ident.name())
-              .withNamespace(ident.namespace())
-              .withColumns(columnEntityList)
-              .withProperties(newProperties)
-              .withComment(comment)
-              .withPartitioning(partitions)
-              .withSortOrders(sortOrders)
-              .withDistribution(distribution)
-              .withIndexes(indexes)
-              .withId(idGenerator.nextId())
-              .withAuditInfo(auditInfo)
-              .build();
-      store.put(entityToStore);
-    } catch (EntityAlreadyExistsException e) {
-      throw new TableAlreadyExistsException(e, "Table %s already exists in the 
metadata", ident);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to create table metadata for " + 
ident, e);
-    }
-
-    // Get the value of register in table properties
-    boolean register =
-        Boolean.parseBoolean(
-            properties.getOrDefault(
-                GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_REGISTER, 
"false"));
-    if (register) {
-      // Do not need to create the physical table if this is a registration 
operation.
-      // Whether we need to check the existence of the physical table?
-      GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
-      return builder
-          .withName(ident.name())
-          .withColumns(columns)
-          .withComment(comment)
-          .withProperties(properties)
-          .withDistribution(distribution)
-          .withIndexes(indexes)
-          .withAuditInfo(
-              AuditInfo.builder()
-                  .withCreator(PrincipalUtils.getCurrentUserName())
-                  .withCreateTime(Instant.now())
-                  .build())
-          .withPartitioning(partitions)
-          .withSortOrders(sortOrders)
-          .withFormat(LakehouseTableFormat.LANCE.lowerName())
-          .build();
-    }
-
-    try {
-      LakehouseCatalogOperations lanceCatalogOperations =
-          getLakehouseCatalogOperations(newProperties);
-      return lanceCatalogOperations.createTable(
-          ident, columns, comment, newProperties, partitions, distribution, 
sortOrders, indexes);
-    } catch (Exception e) {
-      // Try to roll back the metadata entry in Gravitino store
-      try {
-        store.delete(ident, Entity.EntityType.TABLE);
-      } catch (IOException ioException) {
-        LOG.error(
-            "Failed to roll back the metadata entry for table {} after 
physical table creation failure.",
-            ident,
-            ioException);
-      }
-      if (e.getClass().isAssignableFrom(RuntimeException.class)) {
-        throw e;
-      }
-
-      throw new RuntimeException("Failed to create table " + ident, e);
-    }
-  }
-
-  private String calculateTableLocation(
-      Schema schema, NameIdentifier tableIdent, Map<String, String> 
tableProperties) {
-    String tableLocation =
-        (String)
-            propertiesMetadata
-                .tablePropertiesMetadata()
-                .getOrDefault(
-                    tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
-    if (StringUtils.isNotBlank(tableLocation)) {
-      return ensureTrailingSlash(tableLocation);
-    }
-
-    String schemaLocation =
-        schema.properties() == null
-            ? null
-            : 
schema.properties().get(GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
-
-    // If we do not set location in table properties, and schema location is 
set, use schema
-    // location as the base path.
-    if (StringUtils.isNotBlank(schemaLocation)) {
-      return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
-    }
-
-    // If the schema location is not set, use catalog lakehouse dir as the 
base path. Or else, throw
-    // an exception.
-    if (catalogLakehouseLocation.isEmpty()) {
-      throw new RuntimeException(
-          String.format(
-              "No location specified for table %s, you need to set location 
either in catalog, schema, or table properties",
-              tableIdent));
-    }
-
-    String catalogLakehousePath = catalogLakehouseLocation.get().toString();
-    String[] nsLevels = tableIdent.namespace().levels();
-    String schemaName = nsLevels[nsLevels.length - 1];
-    return ensureTrailingSlash(catalogLakehousePath)
-        + schemaName
-        + SLASH
-        + tableIdent.name()
-        + SLASH;
-  }
-
-  @Override
-  public Table alterTable(NameIdentifier ident, TableChange... changes)
-      throws NoSuchTableException, IllegalArgumentException {
-    try {
-      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
-      Map<String, String> tableProperties = tableEntity.properties();
-      LakehouseCatalogOperations lakehouseCatalogOperations =
-          getLakehouseCatalogOperations(tableProperties);
-      return lakehouseCatalogOperations.alterTable(ident, changes);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to alter table " + ident, e);
-    }
-  }
-
-  @Override
-  public boolean purgeTable(NameIdentifier ident) {
-    try {
-      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
-      LakehouseCatalogOperations lakehouseCatalogOperations =
-          getLakehouseCatalogOperations(tableEntity.properties());
-      return lakehouseCatalogOperations.purgeTable(ident);
-    } catch (NoSuchTableException e) {
-      LOG.warn("Table {} does not exist, skip purging it.", ident);
-      return false;
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to purge table: " + ident, e);
-    }
-  }
-
-  @Override
-  public boolean dropTable(NameIdentifier ident) throws 
UnsupportedOperationException {
-    try {
-      // Only delete the metadata entry here. The physical data will not be 
deleted.
-      if (!tableExists(ident)) {
-        return false;
-      }
-      return store.delete(ident, Entity.EntityType.TABLE);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to drop table " + ident, e);
-    }
-  }
-
-  private LakehouseCatalogOperations getLakehouseCatalogOperations(
-      Map<String, String> tableProperties) {
-    LakehouseTableFormat format =
-        (LakehouseTableFormat)
-            propertiesMetadata
-                .tablePropertiesMetadata()
-                .getOrDefault(
-                    tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
-
-    return SUPPORTED_FORMATS.compute(
-        format,
-        (k, v) ->
-            v == null
-                ? createLakehouseCatalogOperations(
-                    format, tableProperties, catalogInfo, propertiesMetadata)
-                : v);
-  }
-
-  private String ensureTrailingSlash(String path) {
-    return path.endsWith(SLASH) ? path : path + SLASH;
-  }
-
-  private LakehouseCatalogOperations createLakehouseCatalogOperations(
-      LakehouseTableFormat format,
-      Map<String, String> properties,
-      CatalogInfo catalogInfo,
-      HasPropertyMetadata propertiesMetadata) {
-    LakehouseCatalogOperations operations;
-    switch (format) {
-      case LANCE:
-        operations = new LanceCatalogOperations();
-        break;
-      default:
-        throw new UnsupportedOperationException("Unsupported lakehouse format: 
" + format);
-    }
-
-    operations.initialize(properties, catalogInfo, propertiesMetadata);
-    return operations;
-  }
-
-  /**
-   * Calculate the table storage properties by merging catalog config, schema 
properties and table
-   * properties. The precedence is: table properties > schema properties > 
catalog config.
-   *
-   * @param schema The schema of the table.
-   * @param tableProps The table properties.
-   * @return The merged table storage properties.
-   */
-  private Map<String, String> calculateTableStorageProps(
-      Schema schema, Map<String, String> tableProps) {
-    Map<String, String> storageProps = 
getLanceTableStorageOptions(catalogConfig);
-    storageProps.putAll(getLanceTableStorageOptions(schema.properties()));
-    storageProps.putAll(getLanceTableStorageOptions(tableProps));
-    return storageProps;
-  }
-
-  private Map<String, String> getLanceTableStorageOptions(Map<String, String> 
properties) {
-    if (MapUtils.isEmpty(properties)) {
-      return Maps.newHashMap();
-    }
-    return properties.entrySet().stream()
-        .filter(
-            e ->
-                e.getKey()
-                    .startsWith(
-                        
GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-  }
-}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
deleted file mode 100644
index 72c1e5bc57..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog.lakehouse;
-
-import static 
org.apache.gravitino.connector.PropertyEntry.booleanPropertyEntry;
-import static org.apache.gravitino.connector.PropertyEntry.enumPropertyEntry;
-import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import org.apache.gravitino.connector.BasePropertiesMetadata;
-import org.apache.gravitino.connector.PropertyEntry;
-
-public class GenericLakehouseTablePropertiesMetadata extends 
BasePropertiesMetadata {
-  public static final String LAKEHOUSE_LOCATION = "location";
-  public static final String LAKEHOUSE_FORMAT = "format";
-  public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX = 
"lance.storage.";
-  public static final String LAKEHOUSE_REGISTER = "register";
-
-  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
-
-  static {
-    List<PropertyEntry<?>> propertyEntries =
-        ImmutableList.of(
-            stringOptionalPropertyEntry(
-                LAKEHOUSE_LOCATION,
-                "The root directory of the lakehouse catalog.",
-                false /* immutable */,
-                null, /* defaultValue */
-                false /* hidden */),
-            enumPropertyEntry(
-                LAKEHOUSE_FORMAT,
-                "The table format of the lakehouse table (e.g., iceberg, 
delta, lance)",
-                true /* required */,
-                true /* immutable */,
-                LakehouseTableFormat.class /* enumClass */,
-                null /* defaultValue */,
-                false /* hidden */,
-                false /* reserved */),
-            PropertyEntry.stringOptionalPropertyPrefixEntry(
-                LANCE_TABLE_STORAGE_OPTION_PREFIX,
-                "The storage options passed to Lance table.",
-                false /* immutable */,
-                null /* default value*/,
-                false /* hidden */,
-                false /* reserved */),
-            booleanPropertyEntry(
-                LAKEHOUSE_REGISTER,
-                "Whether this is a table registration operation.",
-                false,
-                true /* immutable */,
-                false /* defaultValue */,
-                false /* hidden */,
-                false));
-
-    PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
-  }
-
-  @Override
-  protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return PROPERTIES_METADATA;
-  }
-}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
deleted file mode 100644
index d5b95845db..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse;
-
-import org.apache.gravitino.connector.CatalogOperations;
-import org.apache.gravitino.rel.TableCatalog;
-
-/**
- * Interface for detailed lakehouse catalog operations, combining catalog 
operations and table
- * catalog. {@link GenericLakehouseCatalog} will try to use this interface to 
provide detailed
- * lakehouse catalog operations.
- *
- * <pre>
- *    GenericLakehouseCatalog.createTable()
- *       -> LakehouseCatalogOperations.createTable()
- *         -> LanceTableOperations.createTable()
- *         -> IcebergTableOperations.createTable()
- *         -> DeltaTableOperations.createTable()
- *         ...
- * </pre>
- */
-public interface LakehouseCatalogOperations extends CatalogOperations, 
TableCatalog {}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
deleted file mode 100644
index 57d0230f48..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse;
-
-public enum LakehouseTableFormat {
-  LANCE,
-
-  DELTA,
-
-  ICEBERG;
-
-  public String lowerName() {
-    return this.name().toLowerCase();
-  }
-
-  public static LakehouseTableFormat fromFormatName(String type) {
-    for (LakehouseTableFormat tableType : LakehouseTableFormat.values()) {
-      if (tableType.name().equalsIgnoreCase(type)) {
-        return tableType;
-      }
-    }
-    throw new IllegalArgumentException("Unknown LakehouseTableFormat: " + 
type);
-  }
-}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
deleted file mode 100644
index 133644dccc..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse.lance;
-
-import static org.apache.gravitino.Entity.EntityType.TABLE;
-
-import com.google.common.collect.Lists;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.WriteParams;
-import com.lancedb.lance.index.DistanceType;
-import com.lancedb.lance.index.IndexParams;
-import com.lancedb.lance.index.IndexType;
-import com.lancedb.lance.index.vector.VectorIndexParams;
-import java.io.IOException;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.gravitino.Catalog;
-import org.apache.gravitino.Entity;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata;
-import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
-import org.apache.gravitino.catalog.lakehouse.LakehouseTableFormat;
-import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
-import org.apache.gravitino.connector.CatalogInfo;
-import org.apache.gravitino.connector.GenericLakehouseTable;
-import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.exceptions.NoSuchEntityException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NoSuchTableException;
-import org.apache.gravitino.exceptions.TableAlreadyExistsException;
-import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
-import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
-import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.TableEntity;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
-import org.apache.gravitino.rel.TableChange;
-import org.apache.gravitino.rel.expressions.distributions.Distribution;
-import org.apache.gravitino.rel.expressions.sorts.SortOrder;
-import org.apache.gravitino.rel.expressions.transforms.Transform;
-import org.apache.gravitino.rel.indexes.Index;
-import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
-import org.apache.gravitino.utils.PrincipalUtils;
-
-public class LanceCatalogOperations implements LakehouseCatalogOperations {
-
-  private EntityStore store;
-
-  @Override
-  public void initialize(
-      Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
-      throws RuntimeException {
-    store = GravitinoEnv.getInstance().entityStore();
-  }
-
-  @Override
-  public void testConnection(
-      NameIdentifier catalogIdent,
-      Catalog.Type type,
-      String provider,
-      String comment,
-      Map<String, String> properties)
-      throws Exception {}
-
-  @Override
-  public void close() throws IOException {}
-
-  @Override
-  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    // No need to do nothing here as GenericLakehouseCatalogOperations will do 
the work.
-    throw new UnsupportedOperationException(
-        "We should not reach here as we could get table info" + "from 
metastore directly.");
-  }
-
-  @Override
-  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
-    // No need to do nothing here as GenericLakehouseCatalogOperations will do 
the work.
-    throw new UnsupportedOperationException(
-        "We should not reach here as we could get table info" + "from 
metastore directly.");
-  }
-
-  @Override
-  public Table createTable(
-      NameIdentifier ident,
-      Column[] columns,
-      String comment,
-      Map<String, String> properties,
-      Transform[] partitions,
-      Distribution distribution,
-      SortOrder[] sortOrders,
-      Index[] indexes)
-      throws NoSuchSchemaException, TableAlreadyExistsException {
-    // Ignore partitions, distributions, sortOrders, and indexes for Lance 
tables;
-    String location = 
properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
-    Map<String, String> storageProps = 
LancePropertiesUtils.getLanceStorageOptions(properties);
-
-    try (Dataset ignored =
-        Dataset.create(
-            new RootAllocator(),
-            location,
-            convertColumnsToArrowSchema(columns),
-            new 
WriteParams.Builder().withStorageOptions(storageProps).build())) {
-      GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
-      return builder
-          .withName(ident.name())
-          .withColumns(columns)
-          .withComment(comment)
-          .withProperties(properties)
-          .withDistribution(distribution)
-          .withIndexes(indexes)
-          .withAuditInfo(
-              AuditInfo.builder()
-                  .withCreator(PrincipalUtils.getCurrentUserName())
-                  .withCreateTime(Instant.now())
-                  .build())
-          .withPartitioning(partitions)
-          .withSortOrders(sortOrders)
-          .withFormat(LakehouseTableFormat.LANCE.lowerName())
-          .build();
-    }
-  }
-
-  private org.apache.arrow.vector.types.pojo.Schema 
convertColumnsToArrowSchema(Column[] columns) {
-    List<Field> fields =
-        Arrays.stream(columns)
-            .map(
-                col ->
-                    LanceDataTypeConverter.CONVERTER.toArrowField(
-                        col.name(), col.dataType(), col.nullable()))
-            .collect(Collectors.toList());
-    return new org.apache.arrow.vector.types.pojo.Schema(fields);
-  }
-
-  @Override
-  public Table alterTable(NameIdentifier ident, TableChange... changes)
-      throws NoSuchTableException, IllegalArgumentException {
-    // Lance only supports adding indexes for now.
-    List<Index> addedIndexes = Lists.newArrayList();
-
-    // Only support for adding index for now.
-    for (TableChange change : changes) {
-      if (change instanceof TableChange.AddIndex addIndexChange) {
-        Index index =
-            IndexImpl.builder()
-                .withIndexType(addIndexChange.getType())
-                .withName(addIndexChange.getName())
-                .withFieldNames(addIndexChange.getFieldNames())
-                .build();
-        addedIndexes.add(index);
-      }
-    }
-
-    TableEntity updatedEntity;
-    try {
-      TableEntity entity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
-      updatedEntity =
-          store.update(
-              ident,
-              TableEntity.class,
-              TABLE,
-              tableEntity ->
-                  TableEntity.builder()
-                      .withId(tableEntity.id())
-                      .withName(tableEntity.name())
-                      .withNamespace(tableEntity.namespace())
-                      .withAuditInfo(
-                          AuditInfo.builder()
-                              .withCreator(tableEntity.auditInfo().creator())
-                              
.withCreateTime(tableEntity.auditInfo().createTime())
-                              
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
-                              .withLastModifiedTime(Instant.now())
-                              .build())
-                      .withColumns(tableEntity.columns())
-                      .withIndexes(
-                          ArrayUtils.addAll(entity.indexes(), 
addedIndexes.toArray(new Index[0])))
-                      .withDistribution(tableEntity.distribution())
-                      .withPartitioning(tableEntity.partitioning())
-                      .withSortOrders(tableEntity.sortOrders())
-                      .withProperties(tableEntity.properties())
-                      .withComment(tableEntity.comment())
-                      .build());
-
-      // Add indexes to Lance dataset
-      addLanceIndex(updatedEntity, addedIndexes);
-
-      // return the updated table
-      return GenericLakehouseTable.builder()
-          .withProperties(updatedEntity.properties())
-          .withAuditInfo(updatedEntity.auditInfo())
-          .withSortOrders(updatedEntity.sortOrders())
-          .withPartitioning(updatedEntity.partitioning())
-          .withDistribution(updatedEntity.distribution())
-          .withColumns(EntityConverter.toColumns(updatedEntity.columns()))
-          .withIndexes(updatedEntity.indexes())
-          .withName(updatedEntity.name())
-          .withComment(updatedEntity.comment())
-          .build();
-    } catch (NoSuchEntityException e) {
-      throw new NoSuchTableException("No such table: %s", ident);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Failed to load table entity for: " + ident, 
ioe);
-    }
-  }
-
-  private void addLanceIndex(TableEntity updatedEntity, List<Index> 
addedIndexes) {
-    String location =
-        
updatedEntity.properties().get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
-    try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
-      // For Lance, we only support adding indexes, so in fact, we can't 
handle drop index here.
-      for (Index index : addedIndexes) {
-        IndexType indexType = IndexType.valueOf(index.type().name());
-        IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
-        dataset.createIndex(
-            Arrays.stream(index.fieldNames())
-                .map(fieldPath -> String.join(".", fieldPath))
-                .collect(Collectors.toList()),
-            indexType,
-            Optional.of(index.name()),
-            indexParams,
-            true);
-      }
-    }
-  }
-
-  private IndexParams getIndexParamsByIndexType(IndexType indexType) {
-    switch (indexType) {
-      case SCALAR:
-        return IndexParams.builder().build();
-      case VECTOR:
-        // TODO make these parameters configurable
-        int numberOfDimensions = 3; // this value should be determined 
dynamically based on the data
-        // Add properties to Index to set this value.
-        return IndexParams.builder()
-            .setVectorIndexParams(
-                VectorIndexParams.ivfPq(2, 8, numberOfDimensions, 
DistanceType.L2, 2))
-            .build();
-      default:
-        throw new IllegalArgumentException("Unsupported index type: " + 
indexType);
-    }
-  }
-
-  @Override
-  public boolean purgeTable(NameIdentifier ident) {
-    try {
-      TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
-      Map<String, String> lancePropertiesMap = tableEntity.properties();
-      String location =
-          
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
-
-      if (!store.delete(ident, Entity.EntityType.TABLE)) {
-        throw new RuntimeException("Failed to purge Lance table: " + 
ident.name());
-      }
-
-      // Drop the Lance dataset from cloud storage.
-      Dataset.drop(location, 
LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap));
-      return true;
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to purge Lance table: " + 
ident.name(), e);
-    }
-  }
-
-  @Override
-  public boolean dropTable(NameIdentifier ident) {
-    // TODO We will handle it in GenericLakehouseCatalogOperations. However, 
we need
-    //  to figure out it's a external table or not first. we will introduce a 
property
-    //  to indicate that. Currently, all Lance tables are external tables. 
`drop` will
-    //  just remove the metadata in metastore and will not delete data in 
storage.
-    throw new UnsupportedOperationException(
-        "LanceCatalogOperations does not support dropTable operation.");
-  }
-}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
deleted file mode 100644
index 734309a444..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse.utils;
-
-import java.util.List;
-import org.apache.gravitino.connector.GenericLakehouseColumn;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.rel.Column;
-
-public class EntityConverter {
-  public static Column[] toColumns(List<ColumnEntity> columnEntities) {
-    return 
columnEntities.stream().map(EntityConverter::toColumn).toArray(Column[]::new);
-  }
-
-  private static Column toColumn(ColumnEntity columnEntity) {
-    return GenericLakehouseColumn.builder()
-        .withName(columnEntity.name())
-        .withComment(columnEntity.comment())
-        .withAutoIncrement(columnEntity.autoIncrement())
-        .withNullable(columnEntity.nullable())
-        .withType(columnEntity.dataType())
-        .withDefaultValue(columnEntity.defaultValue())
-        .build();
-  }
-}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
deleted file mode 100644
index 9da5ed530e..0000000000
--- 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse.utils;
-
-import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
-
-import java.util.List;
-import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.rel.types.Types;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestEntityConverter {
-
-  @Test
-  void testToColumns() {
-    AuditInfo auditInfo = AuditInfo.builder().build();
-    List<ColumnEntity> columnEntities =
-        List.of(
-            ColumnEntity.builder()
-                .withName("id")
-                .withId(1L)
-                .withDataType(Types.IntegerType.get())
-                .withComment("Identifier")
-                .withAutoIncrement(true)
-                .withNullable(false)
-                .withDefaultValue(DEFAULT_VALUE_NOT_SET)
-                .withAuditInfo(auditInfo)
-                .withPosition(1)
-                .build(),
-            ColumnEntity.builder()
-                .withName("name")
-                .withId(2L)
-                .withDataType(Types.StringType.get())
-                .withComment("Name of the entity")
-                .withAutoIncrement(false)
-                .withNullable(true)
-                .withDefaultValue(DEFAULT_VALUE_NOT_SET)
-                .withPosition(2)
-                .withAuditInfo(auditInfo)
-                .build());
-    var columns = EntityConverter.toColumns(columnEntities);
-    Assertions.assertEquals(2, columns.length);
-    for (var column : columns) {
-      if (column.name().equals("id")) {
-        Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
-        Assertions.assertEquals("Identifier", column.comment());
-        Assertions.assertTrue(column.autoIncrement());
-        Assertions.assertFalse(column.nullable());
-        Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
-      } else if (column.name().equals("name")) {
-        Assertions.assertEquals(Types.StringType.get(), column.dataType());
-        Assertions.assertEquals("Name of the entity", column.comment());
-        Assertions.assertFalse(column.autoIncrement());
-        Assertions.assertTrue(column.nullable());
-        Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
-      }
-    }
-  }
-}
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts 
b/catalogs/catalog-lakehouse-generic/build.gradle.kts
similarity index 93%
rename from catalogs/catalog-generic-lakehouse/build.gradle.kts
rename to catalogs/catalog-lakehouse-generic/build.gradle.kts
index 7dd0a4bc16..484514bede 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-generic/build.gradle.kts
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-description = "catalog-generic-lakehouse"
+description = "catalog-lakehouse-generic"
 
 plugins {
   `maven-publish`
@@ -43,7 +43,6 @@ dependencies {
   implementation(libs.commons.io)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
-  implementation(libs.hadoop3.client.api)
   implementation(libs.lance)
 
   annotationProcessor(libs.lombok)
@@ -86,12 +85,12 @@ tasks {
       exclude("log4j-*.jar")
       exclude("slf4j-*.jar")
     }
-    into("$rootDir/distribution/package/catalogs/generic-lakehouse/libs")
+    into("$rootDir/distribution/package/catalogs/lakehouse-generic/libs")
   }
 
   val copyCatalogConfig by registering(Copy::class) {
     from("src/main/resources")
-    into("$rootDir/distribution/package/catalogs/generic-lakehouse/conf")
+    into("$rootDir/distribution/package/catalogs/lakehouse-generic/conf")
 
     rename { original ->
       if (original.endsWith(".template")) {
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalog.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalog.java
similarity index 69%
rename from 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalog.java
rename to 
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalog.java
index 68072f55ba..c8000989a4 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalog.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalog.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
 import java.util.Map;
 import org.apache.gravitino.connector.BaseCatalog;
@@ -25,16 +25,16 @@ import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.connector.capability.Capability;
 
 /** Implementation of a generic lakehouse catalog in Apache Gravitino. */
-public class GenericLakehouseCatalog extends 
BaseCatalog<GenericLakehouseCatalog> {
+public class GenericCatalog extends BaseCatalog<GenericCatalog> {
 
-  static final GenericLakehouseCatalogPropertiesMetadata 
CATALOG_PROPERTIES_METADATA =
-      new GenericLakehouseCatalogPropertiesMetadata();
+  private static final GenericCatalogPropertiesMetadata 
CATALOG_PROPERTIES_METADATA =
+      new GenericCatalogPropertiesMetadata();
 
-  static final GenericLakehouseSchemaPropertiesMetadata 
SCHEMA_PROPERTIES_METADATA =
-      new GenericLakehouseSchemaPropertiesMetadata();
+  private static final GenericSchemaPropertiesMetadata 
SCHEMA_PROPERTIES_METADATA =
+      new GenericSchemaPropertiesMetadata();
 
-  static final GenericLakehouseTablePropertiesMetadata 
TABLE_PROPERTIES_METADATA =
-      new GenericLakehouseTablePropertiesMetadata();
+  private static final GenericTablePropertiesMetadata 
TABLE_PROPERTIES_METADATA =
+      new GenericTablePropertiesMetadata();
 
   /**
    * Returns the short name of the generic lakehouse catalog.
@@ -43,24 +43,23 @@ public class GenericLakehouseCatalog extends 
BaseCatalog<GenericLakehouseCatalog
    */
   @Override
   public String shortName() {
-    return "generic-lakehouse";
+    return "lakehouse-generic";
   }
 
   /**
-   * Creates a new instance of {@link GenericLakehouseCatalogOperations} with 
the provided
-   * configuration.
+   * Creates a new instance of {@link GenericCatalogOperations} with the 
provided configuration.
    *
    * @param config The configuration map for the generic catalog operations.
-   * @return A new instance of {@link GenericLakehouseCatalogOperations}.
+   * @return A new instance of {@link GenericCatalogOperations}.
    */
   @Override
   protected CatalogOperations newOps(Map<String, String> config) {
-    return new GenericLakehouseCatalogOperations();
+    return new GenericCatalogOperations();
   }
 
   @Override
   public Capability newCapability() {
-    return new GenericLakehouseCatalogCapability();
+    return new GenericCatalogCapability();
   }
 
   @Override
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogCapability.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
similarity index 69%
rename from 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogCapability.java
rename to 
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
index 412b82d6a7..5900d0b51e 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogCapability.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
@@ -16,15 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
+import java.util.Objects;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.connector.capability.CapabilityResult;
 
-public class GenericLakehouseCatalogCapability implements Capability {
+public class GenericCatalogCapability implements Capability {
 
   @Override
   public CapabilityResult managedStorage(Scope scope) {
-    return CapabilityResult.SUPPORTED;
+    if (Objects.requireNonNull(scope) == Scope.TABLE
+        || Objects.requireNonNull(scope) == Scope.SCHEMA) {
+      return CapabilityResult.SUPPORTED;
+    }
+
+    return CapabilityResult.unsupported(
+        String.format("Generic catalog does not support managed storage for 
%s.", scope));
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogOperations.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogOperations.java
new file mode 100644
index 0000000000..e31263fadc
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogOperations.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.generic;
+
+import static org.apache.gravitino.Entity.EntityType.TABLE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.storage.IdGenerator;
+
+/** Operations for interacting with a generic lakehouse catalog in Apache 
Gravitino. */
+public class GenericCatalogOperations implements CatalogOperations, 
SupportsSchemas, TableCatalog {
+
+  private static final String SLASH = "/";
+
+  private final ManagedSchemaOperations schemaOps;
+
+  private final Map<String, Supplier<ManagedTableOperations>> tableOpsCache;
+
+  private Optional<String> catalogLocation;
+
+  private HasPropertyMetadata propertiesMetadata;
+
+  private final Cache<NameIdentifier, String> tableFormatCache;
+
+  private final EntityStore store;
+
+  public GenericCatalogOperations() {
+    this(GravitinoEnv.getInstance().entityStore(), 
GravitinoEnv.getInstance().idGenerator());
+  }
+
+  @VisibleForTesting
+  GenericCatalogOperations(EntityStore store, IdGenerator idGenerator) {
+    this.store = store;
+
+    this.schemaOps =
+        new ManagedSchemaOperations() {
+          @Override
+          protected EntityStore store() {
+            return store;
+          }
+        };
+
+    this.tableFormatCache = 
CacheBuilder.newBuilder().maximumSize(1000).build();
+
+    // Initialize all the table operations for different table formats.
+    Map<String, LakehouseTableDelegator> tableDelegators =
+        LakehouseTableDelegatorFactory.tableDelegators();
+    tableOpsCache =
+        Collections.unmodifiableMap(
+            tableDelegators.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        // Lazy initialize the table operations when needed.
+                        e -> {
+                          LakehouseTableDelegator delegator = e.getValue();
+                          return Suppliers.memoize(
+                              () -> delegator.createTableOps(store, schemaOps, 
idGenerator));
+                        })));
+    if (tableOpsCache.isEmpty()) {
+      throw new IllegalArgumentException("No table delegators found, this is 
unexpected.");
+    }
+  }
+
+  @Override
+  public void initialize(
+      Map<String, String> conf, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
+      throws RuntimeException {
+    String location =
+        (String)
+            propertiesMetadata
+                .catalogPropertiesMetadata()
+                .getOrDefault(conf, Catalog.PROPERTY_LOCATION);
+    this.catalogLocation =
+        StringUtils.isNotBlank(location)
+            ? Optional.of(location).map(this::ensureTrailingSlash)
+            : Optional.empty();
+    this.propertiesMetadata = propertiesMetadata;
+  }
+
+  @Override
+  public void close() {
+    tableFormatCache.cleanUp();
+  }
+
+  @Override
+  public void testConnection(
+      NameIdentifier catalogIdent,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties) {
+    // No-op for generic catalog.
+  }
+
+  @Override
+  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
+    return schemaOps.listSchemas(namespace);
+  }
+
+  @Override
+  public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
+      throws NoSuchCatalogException, SchemaAlreadyExistsException {
+    return schemaOps.createSchema(ident, comment, properties);
+  }
+
+  @Override
+  public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
+    return schemaOps.loadSchema(ident);
+  }
+
+  @Override
+  public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
+      throws NoSuchSchemaException {
+    return schemaOps.alterSchema(ident, changes);
+  }
+
+  @Override
+  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
+    Namespace tableNs =
+        Namespace.of(ident.namespace().level(0), ident.namespace().level(1), 
ident.name());
+    NameIdentifier[] tableIdents;
+    try {
+      tableIdents = listTables(tableNs);
+    } catch (NoSuchSchemaException e) {
+      // If schema does not exist, return false.
+      return false;
+    }
+
+    if (!cascade && tableIdents.length > 0) {
+      throw new NonEmptySchemaException(
+          "Schema %s is not empty, cannot drop it without cascade", ident);
+    }
+
+    // Drop all tables under the schema first if cascade is true.
+    for (NameIdentifier tableIdent : tableIdents) {
+      tableOps(tableIdent).dropTable(tableIdent);
+    }
+
+    return schemaOps.dropSchema(ident, cascade);
+  }
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    // We get the table operations from any cached table ops, since listing 
tables is not
+    // format-specific.
+    ManagedTableOperations tableOps = 
tableOpsCache.values().iterator().next().get();
+    return tableOps.listTables(namespace);
+  }
+
+  @Override
+  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    Table loadedTable = tableOps(ident).loadTable(ident);
+
+    Optional<String> tableFormat =
+        Optional.ofNullable(
+                
loadedTable.properties().getOrDefault(Table.PROPERTY_TABLE_FORMAT, null))
+            .map(s -> s.toLowerCase(Locale.ROOT));
+    tableFormat.ifPresent(s -> tableFormatCache.put(ident, s));
+
+    return loadedTable;
+  }
+
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
+    String tableLocation = calculateTableLocation(schema, ident, properties);
+
+    String format = properties.getOrDefault(Table.PROPERTY_TABLE_FORMAT, null);
+    Preconditions.checkArgument(
+        format != null, "Table format must be specified in table properties");
+    format = format.toLowerCase(Locale.ROOT);
+
+    Map<String, String> newProperties = Maps.newHashMap(properties);
+    newProperties.put(Table.PROPERTY_LOCATION, tableLocation);
+    newProperties.put(Table.PROPERTY_TABLE_FORMAT, format);
+
+    // Get the table operations for the specified table format.
+    Supplier<ManagedTableOperations> tableOpsSupplier = 
tableOpsCache.get(format);
+    Preconditions.checkArgument(tableOpsSupplier != null, "Unsupported table 
format: %s", format);
+    ManagedTableOperations tableOps = tableOpsSupplier.get();
+
+    Table createdTable =
+        tableOps.createTable(
+            ident, columns, comment, newProperties, partitions, distribution, 
sortOrders, indexes);
+    // Cache the table format for future use.
+    tableFormatCache.put(ident, format);
+    return createdTable;
+  }
+
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    Table alteredTable = tableOps(ident).alterTable(ident, changes);
+
+    boolean isRenameChange =
+        Arrays.stream(changes).anyMatch(c -> c instanceof 
TableChange.RenameTable);
+    if (isRenameChange) {
+      tableFormatCache.invalidate(ident);
+    }
+
+    return alteredTable;
+  }
+
+  @Override
+  public boolean purgeTable(NameIdentifier ident) {
+    boolean purged = tableOps(ident).purgeTable(ident);
+    tableFormatCache.invalidate(ident);
+    return purged;
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) throws 
UnsupportedOperationException {
+    boolean dropped = tableOps(ident).dropTable(ident);
+    tableFormatCache.invalidate(ident);
+    return dropped;
+  }
+
+  private String calculateTableLocation(
+      Schema schema, NameIdentifier tableIdent, Map<String, String> 
tableProperties) {
+    String tableLocation =
+        (String)
+            propertiesMetadata
+                .tablePropertiesMetadata()
+                .getOrDefault(tableProperties, Table.PROPERTY_LOCATION);
+    if (StringUtils.isNotBlank(tableLocation)) {
+      return ensureTrailingSlash(tableLocation);
+    }
+
+    String schemaLocation =
+        schema.properties() == null ? null : 
schema.properties().get(Schema.PROPERTY_LOCATION);
+
+    // If we do not set location in table properties, and schema location is 
set, use schema
+    // location as the base path.
+    if (StringUtils.isNotBlank(schemaLocation)) {
+      return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
+    }
+
+    // If the schema location is not set, use catalog lakehouse dir as the 
base path. Or else, throw
+    // an exception.
+    if (catalogLocation.isEmpty()) {
+      throw new IllegalArgumentException(
+          "'location' property is neither set in table properties "
+              + "nor in schema properties, and no location is set in catalog 
properties either. "
+              + "Please set the 'location' in either of them to create the 
table "
+              + tableIdent);
+    }
+
+    return ensureTrailingSlash(catalogLocation.get())
+        + tableIdent.namespace().level(2)
+        + SLASH
+        + tableIdent.name()
+        + SLASH;
+  }
+
+  private String ensureTrailingSlash(String path) {
+    return path.endsWith(SLASH) ? path : path + SLASH;
+  }
+
+  private ManagedTableOperations tableOps(NameIdentifier tableIdent) {
+    try {
+      String tableFormat =
+          tableFormatCache.get(
+              tableIdent,
+              () -> {
+                TableEntity table = store.get(tableIdent, TABLE, 
TableEntity.class);
+                String format = 
table.properties().getOrDefault(Table.PROPERTY_TABLE_FORMAT, null);
+                Preconditions.checkArgument(
+                    format != null, "Table format for %s is null, this is 
unexpected", tableIdent);
+
+                return format.toLowerCase(Locale.ROOT);
+              });
+
+      ManagedTableOperations ops = tableOpsCache.get(tableFormat).get();
+      Preconditions.checkArgument(
+          ops != null, "No table operations found for table format %s", 
tableFormat);
+      return ops;
+
+    } catch (Exception e) {
+      Throwable t = e.getCause();
+
+      if (t instanceof NoSuchEntityException) {
+        throw new NoSuchTableException("Table %s does not exist", tableIdent);
+      } else if (t instanceof IllegalArgumentException) {
+        throw (IllegalArgumentException) t;
+      } else if (t instanceof IOException) {
+        throw new RuntimeException(
+            String.format("Failed to load table %s: %s", tableIdent, 
t.getMessage()), t);
+      } else {
+        throw new RuntimeException(
+            String.format("Unexpected exception when loading table %s", 
tableIdent), t);
+      }
+    }
+  }
+}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogPropertiesMetadata.java
similarity index 67%
rename from 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
rename to 
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogPropertiesMetadata.java
index b8c3958e9a..71daef161b 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogPropertiesMetadata.java
@@ -17,21 +17,19 @@
  * under the License.
  */
 
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
-import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
 import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
-public class GenericLakehouseCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
-
-  public static final String LAKEHOUSE_LOCATION = "location";
+public class GenericCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
@@ -39,18 +37,11 @@ public class GenericLakehouseCatalogPropertiesMetadata 
extends BaseCatalogProper
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringOptionalPropertyEntry(
-                LAKEHOUSE_LOCATION,
-                "The root directory of the lakehouse catalog.",
+                Catalog.PROPERTY_LOCATION,
+                "The root directory of the generic catalog.",
                 false /* immutable */,
                 null, /* defaultValue */
-                false /* hidden */),
-            PropertyEntry.stringOptionalPropertyPrefixEntry(
-                LANCE_TABLE_STORAGE_OPTION_PREFIX,
-                "The storage options passed to Lance table.",
-                false /* immutable */,
-                null /* default value*/,
-                false /* hidden */,
-                false /* reserved */));
+                false /* hidden */));
 
     PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
   }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericSchemaPropertiesMetadata.java
similarity index 65%
copy from 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
copy to 
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericSchemaPropertiesMetadata.java
index 3dd0abf81d..585cf8babc 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericSchemaPropertiesMetadata.java
@@ -16,21 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
-import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
 import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
+import org.apache.gravitino.Schema;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
-public class GenericLakehouseSchemaPropertiesMetadata extends 
BasePropertiesMetadata {
-  public static final String LAKEHOUSE_LOCATION =
-      GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION;
+public class GenericSchemaPropertiesMetadata extends BasePropertiesMetadata {
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
@@ -38,18 +36,11 @@ public class GenericLakehouseSchemaPropertiesMetadata 
extends BasePropertiesMeta
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringOptionalPropertyEntry(
-                LAKEHOUSE_LOCATION,
-                "The root directory of the lakehouse schema.",
+                Schema.PROPERTY_LOCATION,
+                "The root directory of the schema.",
                 false /* immutable */,
                 null, /* defaultValue */
-                false /* hidden */),
-            PropertyEntry.stringOptionalPropertyPrefixEntry(
-                LANCE_TABLE_STORAGE_OPTION_PREFIX,
-                "The storage options passed to Lance table.",
-                false /* immutable */,
-                null /* default value*/,
-                false /* hidden */,
-                false /* reserved */));
+                false /* hidden */));
 
     PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
   }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
similarity index 56%
rename from 
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
rename to 
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
index 3dd0abf81d..14cd263ecd 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
@@ -16,21 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
-import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
 import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.rel.Table;
 
-public class GenericLakehouseSchemaPropertiesMetadata extends 
BasePropertiesMetadata {
-  public static final String LAKEHOUSE_LOCATION =
-      GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION;
+public class GenericTablePropertiesMetadata extends BasePropertiesMetadata {
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
@@ -38,24 +39,32 @@ public class GenericLakehouseSchemaPropertiesMetadata 
extends BasePropertiesMeta
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringOptionalPropertyEntry(
-                LAKEHOUSE_LOCATION,
-                "The root directory of the lakehouse schema.",
+                Table.PROPERTY_LOCATION,
+                "The root directory of the generic table.",
                 false /* immutable */,
                 null, /* defaultValue */
                 false /* hidden */),
-            PropertyEntry.stringOptionalPropertyPrefixEntry(
-                LANCE_TABLE_STORAGE_OPTION_PREFIX,
-                "The storage options passed to Lance table.",
-                false /* immutable */,
-                null /* default value*/,
-                false /* hidden */,
-                false /* reserved */));
+            stringRequiredPropertyEntry(
+                Table.PROPERTY_TABLE_FORMAT,
+                "The format of the table",
+                true /* immutable */,
+                false /* hidden */));
 
     PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
   }
 
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return PROPERTIES_METADATA;
+    // Get all the table specific property entries from the registered table 
delegators, and merge
+    // them with the generic table properties.
+    Map<String, PropertyEntry<?>> tableSpecificPropertyEntries =
+        LakehouseTableDelegatorFactory.tableDelegators().entrySet().stream()
+            .flatMap(kv -> kv.getValue().tablePropertyEntries().stream())
+            .collect(Collectors.toMap(PropertyEntry::getName, entry -> entry));
+
+    return ImmutableMap.<String, PropertyEntry<?>>builder()
+        .putAll(PROPERTIES_METADATA)
+        .putAll(tableSpecificPropertyEntries)
+        .build();
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegator.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegator.java
new file mode 100644
index 0000000000..0e1d1fbec4
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegator.java
@@ -0,0 +1,60 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.generic;
+
+import java.util.List;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * A delegator interface for different lakehouse table formats to provide 
their specific table
+ * operations and property definitions.
+ */
+public interface LakehouseTableDelegator {
+
+  /**
+   * Returns the table format name handled by this delegator.
+   *
+   * @return the table format name
+   */
+  String tableFormat();
+
+  /**
+   * Returns the list of property entries specific to the table format.
+   *
+   * @return the list of property entries
+   */
+  List<PropertyEntry<?>> tablePropertyEntries();
+
+  /**
+   * Create the managed table operations for the specific table format. This 
method should return a
+   * new instance of {@link ManagedTableOperations} each time it is called. 
The returned instance
+   * will be used in {@link GenericCatalogOperations} to perform the specific 
table operation.
+   *
+   * @param store the entity store
+   * @param schemaOps the managed schema operations
+   * @param idGenerator the ID generator
+   * @return the managed table operations
+   */
+  ManagedTableOperations createTableOps(
+      EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator 
idGenerator);
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegatorFactory.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegatorFactory.java
new file mode 100644
index 0000000000..3ec700e374
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegatorFactory.java
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.generic;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LakehouseTableDelegatorFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LakehouseTableDelegatorFactory.class);
+
+  private static ImmutableMap<String, LakehouseTableDelegator> delegators;
+
+  private LakehouseTableDelegatorFactory() {}
+
+  private static synchronized void createTableDelegators() {
+    if (delegators != null) {
+      return;
+    }
+
+    ClassLoader cl =
+        Optional.ofNullable(Thread.currentThread().getContextClassLoader())
+            .orElse(LakehouseTableDelegator.class.getClassLoader());
+    ServiceLoader<LakehouseTableDelegator> loader =
+        ServiceLoader.load(LakehouseTableDelegator.class, cl);
+    delegators =
+        ImmutableMap.copyOf(
+            loader.stream()
+                .collect(
+                    Collectors.toMap(
+                        provider -> provider.get().tableFormat(), 
ServiceLoader.Provider::get)));
+    Preconditions.checkArgument(
+        !delegators.isEmpty(),
+        "No LakehouseTableDelegator implementation found via ServiceLoader, 
this is unexpected, "
+            + "please check the code to see what is going on.");
+
+    LOG.info("Loaded LakehouseTableDelegators: {}", delegators.keySet());
+  }
+
+  public static ImmutableMap<String, LakehouseTableDelegator> 
tableDelegators() {
+    // Initialize delegators if not yet done
+    createTableDelegators();
+    return delegators;
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java
new file mode 100644
index 0000000000..d23322eb17
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.IdGenerator;
+
+public class LanceTableDelegator implements LakehouseTableDelegator {
+
+  public static final String LANCE_TABLE_FORMAT = "lance";
+
+  public static final String PROPERTY_LANCE_TABLE_REGISTER = "lance.register";
+
+  public static final String PROPERTY_LANCE_STORAGE_OPTIONS_PREFIX = 
"lance.storage.";
+
+  @Override
+  public String tableFormat() {
+    return LANCE_TABLE_FORMAT;
+  }
+
+  @Override
+  public List<PropertyEntry<?>> tablePropertyEntries() {
+    return ImmutableList.of(
+        PropertyEntry.stringOptionalPropertyPrefixEntry(
+            PROPERTY_LANCE_STORAGE_OPTIONS_PREFIX,
+            "The storage options passed to Lance table.",
+            false /* immutable */,
+            null /* default value*/,
+            false /* hidden */,
+            false /* reserved */),
+        PropertyEntry.booleanPropertyEntry(
+            PROPERTY_LANCE_TABLE_REGISTER,
+            "Whether this is a table registration operation.",
+            false,
+            true /* immutable */,
+            false /* defaultValue */,
+            false /* hidden */,
+            false));
+  }
+
+  @Override
+  public ManagedTableOperations createTableOps(
+      EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator 
idGenerator) {
+    return new LanceTableOperations(store, schemaOps, idGenerator);
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java
new file mode 100644
index 0000000000..2eed5288be
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java
@@ -0,0 +1,282 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.base.Preconditions;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.index.DistanceType;
+import com.lancedb.lance.index.IndexParams;
+import com.lancedb.lance.index.IndexType;
+import com.lancedb.lance.index.vector.VectorIndexParams;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
+import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.storage.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LanceTableOperations extends ManagedTableOperations {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LanceTableOperations.class);
+
+  private final EntityStore store;
+
+  private final ManagedSchemaOperations schemaOps;
+
+  private final IdGenerator idGenerator;
+
+  public LanceTableOperations(
+      EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator 
idGenerator) {
+    this.store = store;
+    this.schemaOps = schemaOps;
+    this.idGenerator = idGenerator;
+  }
+
+  @Override
+  protected EntityStore store() {
+    return store;
+  }
+
+  @Override
+  protected SupportsSchemas schemas() {
+    return schemaOps;
+  }
+
+  @Override
+  protected IdGenerator idGenerator() {
+    return idGenerator;
+  }
+
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    String location = properties.get(Table.PROPERTY_LOCATION);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(location), "Table location must be specified");
+    Map<String, String> storageProps = 
LancePropertiesUtils.getLanceStorageOptions(properties);
+
+    boolean register =
+        
Optional.ofNullable(properties.get(LanceTableDelegator.PROPERTY_LANCE_TABLE_REGISTER))
+            .map(Boolean::parseBoolean)
+            .orElse(false);
+    if (register) {
+      // If this is a registration operation, just create the table metadata 
without creating a new
+      // dataset
+      return super.createTable(
+          ident, columns, comment, properties, partitions, distribution, 
sortOrders, indexes);
+    }
+
+    try (Dataset ignored =
+        Dataset.create(
+            new RootAllocator(),
+            location,
+            convertColumnsToArrowSchema(columns),
+            new 
WriteParams.Builder().withStorageOptions(storageProps).build())) {
+      // Only create the table metadata in Gravitino after the Lance dataset 
is successfully
+      // created.
+      return super.createTable(
+          ident, columns, comment, properties, partitions, distribution, 
sortOrders, indexes);
+    } catch (NoSuchSchemaException e) {
+      throw e;
+    } catch (TableAlreadyExistsException e) {
+      // If the table metadata already exists, but the underlying lance table 
was just created
+      // successfully, we need to clean up the created lance table to avoid 
orphaned datasets.
+      Dataset.drop(location, 
LancePropertiesUtils.getLanceStorageOptions(properties));
+      throw e;
+    } catch (IllegalArgumentException e) {
+      if (e.getMessage().contains("Dataset already exists")) {
+        throw new TableAlreadyExistsException(
+            e, "Lance dataset already exists at location %s", location);
+      }
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create Lance dataset at location " 
+ location, e);
+    }
+  }
+
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    // Lance only supports adding indexes for now.
+    boolean onlyAddIndex =
+        Arrays.stream(changes).allMatch(change -> change instanceof 
TableChange.AddIndex);
+    Preconditions.checkArgument(onlyAddIndex, "Only adding indexes is 
supported for Lance tables");
+
+    List<Index> addedIndexes =
+        Arrays.stream(changes)
+            .filter(change -> change instanceof TableChange.AddIndex)
+            .map(
+                change -> {
+                  TableChange.AddIndex addIndexChange = (TableChange.AddIndex) 
change;
+                  return Indexes.IndexImpl.builder()
+                      .withIndexType(addIndexChange.getType())
+                      .withName(addIndexChange.getName())
+                      .withFieldNames(addIndexChange.getFieldNames())
+                      .build();
+                })
+            .collect(Collectors.toList());
+
+    Table loadedTable = super.loadTable(ident);
+    addLanceIndex(loadedTable, addedIndexes);
+    // After adding the index to the Lance dataset, we need to update the 
table metadata in
+    // Gravitino. If there's any failure during this process, the code will 
throw an exception
+    // and the update won't be applied in Gravitino.
+    return super.alterTable(ident, changes);
+  }
+
+  @Override
+  public boolean purgeTable(NameIdentifier ident) {
+    try {
+      Table table = loadTable(ident);
+      String location = table.properties().get(Table.PROPERTY_LOCATION);
+
+      boolean purged = super.purgeTable(ident);
+      // If the table metadata is purged successfully, we can delete the Lance 
dataset.
+      // Otherwise, we should not delete the dataset.
+      if (purged) {
+        // Delete the Lance dataset at the location
+        Dataset.drop(location, 
LancePropertiesUtils.getLanceStorageOptions(table.properties()));
+        LOG.info("Deleted Lance dataset at location {}", location);
+      }
+
+      return purged;
+
+    } catch (NoSuchTableException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to purge Lance dataset for table " + 
ident, e);
+    }
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    try {
+      Table table = loadTable(ident);
+      boolean external =
+          Optional.ofNullable(table.properties().get(Table.PROPERTY_EXTERNAL))
+              .map(Boolean::parseBoolean)
+              .orElse(false);
+
+      boolean dropped = super.dropTable(ident);
+      if (external) {
+        return dropped;
+      }
+
+      // If the table metadata is dropped successfully, and the table is not 
external, we can delete
+      // the
+      // Lance dataset. Otherwise, we should not delete the dataset.
+      if (dropped) {
+        String location = table.properties().get(Table.PROPERTY_LOCATION);
+
+        // Delete the Lance dataset at the location
+        Dataset.drop(location, 
LancePropertiesUtils.getLanceStorageOptions(table.properties()));
+        LOG.info("Deleted Lance dataset at location {}", location);
+      }
+
+      return dropped;
+
+    } catch (NoSuchTableException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to drop Lance dataset for table " + 
ident, e);
+    }
+  }
+
+  private org.apache.arrow.vector.types.pojo.Schema 
convertColumnsToArrowSchema(Column[] columns) {
+    List<Field> fields =
+        Arrays.stream(columns)
+            .map(
+                col ->
+                    LanceDataTypeConverter.CONVERTER.toArrowField(
+                        col.name(), col.dataType(), col.nullable()))
+            .collect(Collectors.toList());
+    return new org.apache.arrow.vector.types.pojo.Schema(fields);
+  }
+
+  private void addLanceIndex(Table table, List<Index> addedIndexes) {
+    String location = table.properties().get(Table.PROPERTY_LOCATION);
+    try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
+      // For Lance, we only support adding indexes, so in fact, we can't 
handle drop index here.
+      for (Index index : addedIndexes) {
+        IndexType indexType = IndexType.valueOf(index.type().name());
+        IndexParams indexParams = getIndexParamsByIndexType(indexType);
+
+        dataset.createIndex(
+            Arrays.stream(index.fieldNames())
+                .map(field -> String.join(".", field))
+                .collect(Collectors.toList()),
+            indexType,
+            Optional.of(index.name()),
+            indexParams,
+            true);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to add indexes to Lance dataset at location " + location, e);
+    }
+  }
+
+  private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+    switch (indexType) {
+      case SCALAR:
+        return IndexParams.builder().build();
+      case VECTOR:
+        // TODO make these parameters configurable
+        int numberOfDimensions = 3; // this value should be determined 
dynamically based on the data
+        // Add properties to Index to set this value.
+        return IndexParams.builder()
+            .setVectorIndexParams(
+                VectorIndexParams.ivfPq(2, 8, numberOfDimensions, 
DistanceType.L2, 2))
+            .build();
+      default:
+        throw new IllegalArgumentException("Unsupported index type: " + 
indexType);
+    }
+  }
+}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
 
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
similarity index 92%
copy from 
catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
copy to 
catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
index 927e28b4fd..f0aeb0cb23 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
@@ -16,4 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-org.apache.gravitino.catalog.lakehouse.GenericLakehouseCatalog
+org.apache.gravitino.catalog.lakehouse.generic.GenericCatalog
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
 
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
similarity index 92%
rename from 
catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
rename to 
catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
index 927e28b4fd..352813f4b1 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
@@ -16,4 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-org.apache.gravitino.catalog.lakehouse.GenericLakehouseCatalog
+org.apache.gravitino.catalog.lakehouse.lance.LanceTableDelegator
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/resources/generic-lakehouse.conf 
b/catalogs/catalog-lakehouse-generic/src/main/resources/lakehouse-generic.conf
similarity index 100%
rename from 
catalogs/catalog-generic-lakehouse/src/main/resources/generic-lakehouse.conf
rename to 
catalogs/catalog-lakehouse-generic/src/main/resources/lakehouse-generic.conf
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestGenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
similarity index 97%
rename from 
catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestGenericLakehouseCatalogOperations.java
rename to 
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
index 67887c2f7a..4c71f4adb1 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestGenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
 import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
 import static 
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
@@ -68,7 +68,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class TestGenericLakehouseCatalogOperations {
+public class TestGenericCatalogOperations {
   private static final String STORE_PATH =
       "/tmp/gravitino_test_entityStore_" + 
UUID.randomUUID().toString().replace("-", "");
   private static final String METALAKE_NAME = "metalake_for_lakehouse_test";
@@ -76,7 +76,7 @@ public class TestGenericLakehouseCatalogOperations {
 
   private static EntityStore store;
   private static IdGenerator idGenerator;
-  private static GenericLakehouseCatalogOperations ops;
+  private static GenericCatalogOperations ops;
 
   @BeforeAll
   public static void setUp() throws IOException {
@@ -132,7 +132,7 @@ public class TestGenericLakehouseCatalogOperations {
             .build();
     store.put(catalog, false);
 
-    ops = new GenericLakehouseCatalogOperations(store);
+    ops = new GenericCatalogOperations(store, idGenerator);
   }
 
   @AfterAll
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestPropertiesMetadata.java
similarity index 52%
rename from 
catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
rename to 
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestPropertiesMetadata.java
index 8dfd3b5ce0..75eb613dc5 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestPropertiesMetadata.java
@@ -17,86 +17,66 @@
  *  under the License.
  */
 
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.gravitino.Schema;
 import org.apache.gravitino.connector.PropertiesMetadata;
+import org.apache.gravitino.rel.Table;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 public class TestPropertiesMetadata {
-  public static GenericLakehouseCatalog genericLakehouseCatalog;
+  public static GenericCatalog genericCatalog;
 
   @BeforeAll
   static void init() {
-    genericLakehouseCatalog = new GenericLakehouseCatalog();
+    genericCatalog = new GenericCatalog();
   }
 
   @Test
   void testCatalogPropertiesMetadata() {
-    PropertiesMetadata catalogPropertiesMetadata =
-        genericLakehouseCatalog.catalogPropertiesMetadata();
+    PropertiesMetadata catalogPropertiesMetadata = 
genericCatalog.catalogPropertiesMetadata();
     Assertions.assertNotNull(catalogPropertiesMetadata);
 
-    Map<String, String> catalogProperties =
-        ImmutableMap.of(
-            "storage.type", "s3",
-            "storage.s3.bucket", "my-bucket",
-            "storage.s3.region", "us-west-2",
-            "location", "/tmp/test1");
+    Map<String, String> catalogProperties = ImmutableMap.of("location", 
"/tmp/test1");
 
     String catalogLocation =
         (String)
             catalogPropertiesMetadata.getOrDefault(
-                catalogProperties, 
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
+                catalogProperties, GenericCatalog.PROPERTY_LOCATION);
     Assertions.assertEquals("/tmp/test1", catalogLocation);
   }
 
   @Test
   void testSchemaPropertiesMetadata() {
-    PropertiesMetadata schemaPropertiesMetadata =
-        genericLakehouseCatalog.schemaPropertiesMetadata();
+    PropertiesMetadata schemaPropertiesMetadata = 
genericCatalog.schemaPropertiesMetadata();
     Assertions.assertNotNull(schemaPropertiesMetadata);
 
-    Map<String, String> schemaProperties =
-        ImmutableMap.of(
-            "storage.type", "s3",
-            "storage.s3.bucket", "my-bucket",
-            "storage.s3.region", "us-west-2",
-            "location", "/tmp/test_schema");
+    Map<String, String> schemaProperties = ImmutableMap.of("location", 
"/tmp/test_schema");
 
     String schemaLocation =
-        (String)
-            schemaPropertiesMetadata.getOrDefault(
-                schemaProperties, 
GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
+        (String) schemaPropertiesMetadata.getOrDefault(schemaProperties, 
Schema.PROPERTY_LOCATION);
     Assertions.assertEquals("/tmp/test_schema", schemaLocation);
   }
 
   @Test
   void testTablePropertiesMetadata() {
-    PropertiesMetadata tablePropertiesMetadata = 
genericLakehouseCatalog.tablePropertiesMetadata();
+    PropertiesMetadata tablePropertiesMetadata = 
genericCatalog.tablePropertiesMetadata();
     Assertions.assertNotNull(tablePropertiesMetadata);
 
     Map<String, String> tableProperties =
         ImmutableMap.of(
-            "storage.type", "s3",
-            "storage.s3.bucket", "my-bucket",
-            "storage.s3.region", "us-west-2",
+            "lance.storage.type", "s3",
+            "lance.storage.s3.bucket", "my-bucket",
+            "lance.storage.s3.region", "us-west-2",
             "location", "/tmp/test_table",
             "format", "iceberg");
 
     String tableLocation =
-        (String)
-            tablePropertiesMetadata.getOrDefault(
-                tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
+        (String) tablePropertiesMetadata.getOrDefault(tableProperties, 
Table.PROPERTY_LOCATION);
     Assertions.assertEquals("/tmp/test_table", tableLocation);
-
-    LakehouseTableFormat tableFormat =
-        (LakehouseTableFormat)
-            tablePropertiesMetadata.getOrDefault(
-                tableProperties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
-    Assertions.assertEquals(LakehouseTableFormat.ICEBERG, tableFormat);
   }
 }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
similarity index 97%
rename from 
catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
rename to 
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
index 0c92b43eef..01b0ce2aed 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse.integration.test;
+package org.apache.gravitino.catalog.lakehouse.lance.integration.test;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
@@ -53,7 +53,6 @@ import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.rel.Column;
@@ -83,10 +82,11 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CatalogGenericLakehouseLanceIT extends BaseIT {
-  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogGenericLakehouseLanceIT.class);
+public class CatalogGenericCatalogLanceIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogGenericCatalogLanceIT.class);
   public static final String metalakeName =
       GravitinoITUtils.genRandomName("CatalogGenericLakeLanceIT_metalake");
+
   public String catalogName = 
GravitinoITUtils.genRandomName("CatalogGenericLakeLanceI_catalog");
   public String SCHEMA_PREFIX = "CatalogGenericLakeLance_schema";
   public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
@@ -96,7 +96,7 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT {
   public static final String LANCE_COL_NAME1 = "lance_col_name1";
   public static final String LANCE_COL_NAME2 = "lance_col_name2";
   public static final String LANCE_COL_NAME3 = "lance_col_name3";
-  protected final String provider = "generic-lakehouse";
+  protected final String provider = "lakehouse-generic";
   protected GravitinoMetalake metalake;
   protected Catalog catalog;
   protected String tempDirectory;
@@ -146,7 +146,7 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT {
   }
 
   @Test
-  public void testCreateLanceTable() throws InterruptedException {
+  public void testCreateLanceTable() {
     // Create a table from Gravitino API
     Column[] columns = createColumns();
     NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
@@ -284,7 +284,7 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT {
     Assertions.assertTrue(e.getMessage().contains("Invalid user input"));
 
     Assertions.assertThrows(
-        NoSuchTableException.class, () -> 
catalog.asTableCatalog().loadTable(newNameIdentifier));
+        RuntimeException.class, () -> 
catalog.asTableCatalog().loadTable(newNameIdentifier));
   }
 
   @Test
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 6289bd3404..7446135834 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -42,10 +42,8 @@ import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.StringIdentifier;
-import org.apache.gravitino.catalog.CatalogManager.CatalogWrapper;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -115,18 +113,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
    */
   @Override
   public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
-    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    if (isManagedTable(catalogIdent)) {
-      return TreeLockUtils.doWithTreeLock(
-          ident,
-          LockType.READ,
-          () ->
-              doWithCatalog(
-                  catalogIdent,
-                  c -> c.doWithTableOps(t -> t.loadTable(ident)),
-                  NoSuchTableException.class));
-    }
-
     EntityCombinedTable entityCombinedTable =
         TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> 
internalLoadTable(ident));
 
@@ -250,9 +236,8 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   NoSuchTableException.class,
                   IllegalArgumentException.class);
 
-          if (isManagedTable(catalogIdent)) {
-            // For generic lakehouse catalog, all operations will be 
dispatched to the underlying
-            // catalog.
+          boolean isManagedTable = isManagedEntity(catalogIdent, 
Capability.Scope.TABLE);
+          if (isManagedTable) {
             return EntityCombinedTable.of(alteredTable)
                 .withHiddenProperties(
                     getHiddenPropertyNames(
@@ -341,19 +326,17 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
         LockType.WRITE,
         () -> {
           NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-          if (isManagedTable(catalogIdent)) {
-            return doWithCatalog(
-                catalogIdent,
-                c -> c.doWithTableOps(t -> t.dropTable(ident)),
-                RuntimeException.class);
-          }
-
           boolean droppedFromCatalog =
               doWithCatalog(
                   catalogIdent,
                   c -> c.doWithTableOps(t -> t.dropTable(ident)),
                   RuntimeException.class);
 
+          boolean isManagedTable = isManagedEntity(catalogIdent, 
Capability.Scope.TABLE);
+          if (isManagedTable) {
+            return droppedFromCatalog;
+          }
+
           // For unmanaged table, it could happen that the table:
           // 1. Is not found in the catalog (dropped directly from underlying 
sources)
           // 2. Is found in the catalog but not in the store (not managed by 
Gravitino)
@@ -362,20 +345,14 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
           // In all situations, we try to delete the schema from the store, 
but we don't take the
           // return value of the store operation into account. We only take 
the return value of the
           // catalog into account.
-          //
-          // For managed table, we should take the return value of the store 
operation into account.
-          boolean droppedFromStore = false;
           try {
-            droppedFromStore = store.delete(ident, TABLE);
+            store.delete(ident, TABLE);
           } catch (NoSuchEntityException e) {
             LOG.warn("The table to be dropped does not exist in the store: 
{}", ident, e);
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
-
-          return isManagedEntity(catalogIdent, Capability.Scope.TABLE)
-              ? droppedFromStore
-              : droppedFromCatalog;
+          return droppedFromCatalog;
         });
   }
 
@@ -408,7 +385,8 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   RuntimeException.class,
                   UnsupportedOperationException.class);
 
-          if (isManagedTable(catalogIdent)) {
+          boolean isManagedTable = isManagedEntity(catalogIdent, 
Capability.Scope.TABLE);
+          if (isManagedTable) {
             return droppedFromCatalog;
           }
 
@@ -420,21 +398,15 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
           // In all situations, we try to delete the schema from the store, 
but we don't take the
           // return value of the store operation into account. We only take 
the return value of the
           // catalog into account.
-          //
-          // For managed table, we should take the return value of the store 
operation into account.
-          boolean droppedFromStore;
           try {
-            droppedFromStore = store.delete(ident, TABLE);
+            store.delete(ident, TABLE);
           } catch (NoSuchEntityException e) {
             LOG.warn("The table to be purged does not exist in the store: {}", 
ident, e);
             return false;
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
-
-          return isManagedEntity(catalogIdent, Capability.Scope.TABLE)
-              ? droppedFromStore
-              : droppedFromCatalog;
+          return droppedFromCatalog;
         });
   }
 
@@ -530,6 +502,18 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             c -> c.doWithTableOps(t -> t.loadTable(ident)),
             NoSuchTableException.class);
 
+    boolean isManagedTable = isManagedEntity(catalogIdentifier, 
Capability.Scope.TABLE);
+    if (isManagedTable) {
+      return EntityCombinedTable.of(table)
+          .withHiddenProperties(
+              getHiddenPropertyNames(
+                  catalogIdentifier,
+                  HasPropertyMetadata::tablePropertiesMetadata,
+                  table.properties()))
+          // The metadata of managed table is stored by Gravitino, so it is 
always imported.
+          .withImported(true /* imported */);
+    }
+
     StringIdentifier stringId = getStringIdFromProperties(table.properties());
     // Case 1: The table is not created by Gravitino or the external system 
does not support storing
     // string identifier.
@@ -554,9 +538,9 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   catalogIdentifier,
                   HasPropertyMetadata::tablePropertiesMetadata,
                   table.properties()))
-          // Some tables don't have properties or are not created by Gravitino,
-          // we can't use stringIdentifier to judge whether schema is ever 
imported or not.
-          // We need to check whether the entity exists.
+          // For some catalogs like PG, the identifier information is not 
stored in the table's
+          // metadata, we need to check if this table exists in the store, if 
so we don't
+          // need to import.
           .withImported(true);
     }
 
@@ -596,31 +580,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                 }),
         IllegalArgumentException.class);
 
-    if (isManagedTable(catalogIdent)) {
-      // For generic lakehouse catalog, all operations will be dispatched to 
the underlying catalog.
-      Table table =
-          doWithCatalog(
-              catalogIdent,
-              c ->
-                  c.doWithTableOps(
-                      t ->
-                          t.createTable(
-                              ident,
-                              columns,
-                              comment,
-                              properties,
-                              partitions == null ? EMPTY_TRANSFORM : 
partitions,
-                              distribution == null ? Distributions.NONE : 
distribution,
-                              sortOrders == null ? new SortOrder[0] : 
sortOrders,
-                              indexes == null ? Indexes.EMPTY_INDEXES : 
indexes)),
-              NoSuchSchemaException.class,
-              TableAlreadyExistsException.class);
-      return EntityCombinedTable.of(table)
-          .withHiddenProperties(
-              getHiddenPropertyNames(
-                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
-    }
-
     long uid = idGenerator.nextId();
     // Add StringIdentifier to the properties, the specific catalog will 
handle this
     // StringIdentifier to make sure only when the operation is successful, 
the related
@@ -649,6 +608,15 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             NoSuchSchemaException.class,
             TableAlreadyExistsException.class);
 
+    // If the table is managed by Gravitino, we don't need to create 
TableEntity and store it again.
+    boolean isManagedTable = isManagedEntity(catalogIdent, 
Capability.Scope.TABLE);
+    if (isManagedTable) {
+      return EntityCombinedTable.of(table)
+          .withHiddenProperties(
+              getHiddenPropertyNames(
+                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+    }
+
     AuditInfo audit =
         AuditInfo.builder()
             .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
@@ -671,12 +639,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     try {
       store.put(tableEntity, true /* overwrite */);
     } catch (Exception e) {
-      if (isManagedTable(catalogIdent)) {
-        // Drop table
-        doWithCatalog(
-            catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)), 
RuntimeException.class);
-      }
-
       LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
       return EntityCombinedTable.of(table)
           .withHiddenProperties(
@@ -684,7 +646,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
     }
 
-    // For managed table, we can use table entity to indicate the table is 
created successfully.
+    // Merge both the metadata from catalog operation and the metadata from 
entity store.
     return EntityCombinedTable.of(table, tableEntity)
         .withHiddenProperties(
             getHiddenPropertyNames(
@@ -699,15 +661,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             .collect(Collectors.toList());
   }
 
-  private boolean isManagedTable(NameIdentifier catalogIdent) {
-    CatalogManager catalogManager = 
GravitinoEnv.getInstance().catalogManager();
-    CatalogWrapper wrapper = catalogManager.loadCatalogAndWrap(catalogIdent);
-    Capability capability = wrapper.catalog().capability();
-
-    CapabilityResult result = 
capability.managedStorage(Capability.Scope.TABLE);
-    return result == CapabilityResult.SUPPORTED;
-  }
-
   private boolean isSameColumn(Column left, int columnPosition, ColumnEntity 
right) {
     return Objects.equal(left.name(), right.name())
         && columnPosition == right.position()
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
deleted file mode 100644
index 4fb7b5d12d..0000000000
--- 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.connector;
-
-import org.apache.gravitino.tag.SupportsTags;
-
-public class GenericLakehouseColumn extends BaseColumn {
-  @Override
-  public SupportsTags supportsTags() {
-    return super.supportsTags();
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder extends BaseColumnBuilder<Builder, 
GenericLakehouseColumn> {
-
-    /** Creates a new instance of {@link Builder}. */
-    private Builder() {}
-
-    /**
-     * Internal method to build a HiveColumn instance using the provided 
values.
-     *
-     * @return A new HiveColumn instance with the configured values.
-     */
-    @Override
-    protected GenericLakehouseColumn internalBuild() {
-      GenericLakehouseColumn hiveColumn = new GenericLakehouseColumn();
-
-      hiveColumn.name = name;
-      hiveColumn.comment = comment;
-      hiveColumn.dataType = dataType;
-      hiveColumn.nullable = nullable;
-      hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : 
defaultValue;
-      hiveColumn.autoIncrement = autoIncrement;
-      return hiveColumn;
-    }
-  }
-}
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
deleted file mode 100644
index 2d14a539e2..0000000000
--- 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.connector;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.gravitino.rel.Table;
-
-public class GenericLakehouseTable extends BaseTable {
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  @Override
-  protected TableOperations newOps() throws UnsupportedOperationException {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  public static class Builder extends BaseTableBuilder<Builder, 
GenericLakehouseTable> {
-
-    private String format;
-
-    public Builder withFormat(String format) {
-      this.format = format;
-      return this;
-    }
-
-    @Override
-    protected GenericLakehouseTable internalBuild() {
-      GenericLakehouseTable genericLakehouseTable = new 
GenericLakehouseTable();
-      genericLakehouseTable.columns = this.columns;
-      genericLakehouseTable.comment = this.comment;
-
-      if (format != null) {
-        genericLakehouseTable.properties =
-            ImmutableMap.<String, String>builder()
-                .putAll(this.properties)
-                .put(Table.PROPERTY_TABLE_FORMAT, this.format)
-                .buildKeepingLast();
-      } else {
-        genericLakehouseTable.properties = this.properties;
-      }
-      genericLakehouseTable.auditInfo = this.auditInfo;
-      genericLakehouseTable.distribution = this.distribution;
-      genericLakehouseTable.indexes = this.indexes;
-      genericLakehouseTable.name = this.name;
-      genericLakehouseTable.partitioning = this.partitioning;
-      genericLakehouseTable.sortOrders = this.sortOrders;
-      return genericLakehouseTable;
-    }
-  }
-}
diff --git a/lance/lance-common/build.gradle.kts 
b/lance/lance-common/build.gradle.kts
index 8bb80c1934..e3a669212c 100644
--- a/lance/lance-common/build.gradle.kts
+++ b/lance/lance-common/build.gradle.kts
@@ -46,3 +46,13 @@ dependencies {
   testImplementation(libs.junit.jupiter.params)
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
+
+tasks.test {
+  val skipITs = project.hasProperty("skipITs")
+  if (skipITs) {
+    // Exclude integration tests
+    exclude("**/integration/test/**")
+  } else {
+    dependsOn(tasks.jar)
+  }
+}
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
index 961134ec66..ee21c8cb1b 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
@@ -222,7 +222,7 @@ public class GravitinoLanceNameSpaceOperations implements 
LanceNamespaceOperatio
           client.createCatalog(
               catalogName,
               Catalog.Type.RELATIONAL,
-              "generic-lakehouse",
+              "lakehouse-generic",
               "created by Lance REST server",
               properties);
       response.setProperties(
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index a1fee0a73e..d90bf08196 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -93,7 +93,7 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper {
 
   public boolean isLakehouseCatalog(Catalog catalog) {
     return catalog.type().equals(Catalog.Type.RELATIONAL)
-        && "generic-lakehouse".equals(catalog.provider());
+        && "lakehouse-generic".equals(catalog.provider());
   }
 
   public Catalog loadAndValidateLakehouseCatalog(String catalogName) {
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
index d298dbe5e5..8bb6bf37ea 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
@@ -127,7 +127,8 @@ public class GravitinoLanceTableOperations implements 
LanceTableOperations {
       createTableProperties.put(LANCE_LOCATION, tableLocation);
     }
     // The format is defined in GenericLakehouseCatalog
-    createTableProperties.put("format", "lance");
+    createTableProperties.put(Table.PROPERTY_TABLE_FORMAT, "lance");
+    createTableProperties.put(Table.PROPERTY_EXTERNAL, "true");
 
     Table t;
     try {
diff --git a/lance/lance-rest-server/build.gradle.kts 
b/lance/lance-rest-server/build.gradle.kts
index 6a825360f3..a74fe4819d 100644
--- a/lance/lance-rest-server/build.gradle.kts
+++ b/lance/lance-rest-server/build.gradle.kts
@@ -114,7 +114,17 @@ tasks {
   test {
     val testMode = project.properties["testMode"] as? String ?: "embedded"
     if (testMode == "embedded") {
-      dependsOn(":catalogs:catalog-generic-lakehouse:build")
+      dependsOn(":catalogs:catalog-lakehouse-generic:build")
     }
   }
 }
+
+tasks.test {
+  val skipITs = project.hasProperty("skipITs")
+  if (skipITs) {
+    // Exclude integration tests
+    exclude("**/integration/test/**")
+  } else {
+    dependsOn(tasks.jar)
+  }
+}
diff --git 
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
 
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
index a63bb39307..51826c3add 100644
--- 
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
+++ 
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
@@ -65,6 +65,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Schema;
@@ -111,9 +112,9 @@ public class LanceRESTServiceIT extends BaseIT {
   }
 
   @AfterAll
-  public void clean() {
+  public void clean() throws IOException {
     client.dropMetalake(getLanceRESTServerMetalakeName(), true);
-    tempDir.toFile().deleteOnExit();
+    FileUtils.deleteDirectory(tempDir.toFile());
   }
 
   @AfterEach
@@ -557,11 +558,8 @@ public class LanceRESTServiceIT extends BaseIT {
     // Create a table without location should fail
     CreateTableRequest noLocationRequest = new CreateTableRequest();
     noLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, 
"no_location_table"));
-    LanceNamespaceException noLocationException =
-        Assertions.assertThrows(
-            LanceNamespaceException.class, () -> 
ns.createTable(noLocationRequest, body));
-    Assertions.assertTrue(
-        noLocationException.getMessage().contains("No location specified for 
table"));
+    Assertions.assertThrows(
+        LanceNamespaceException.class, () -> ns.createTable(noLocationRequest, 
body));
 
     // Create table with invalid schema should fail
     byte[] invalidBody = "".getBytes(Charset.defaultCharset());
@@ -696,7 +694,7 @@ public class LanceRESTServiceIT extends BaseIT {
     return metalake.createCatalog(
         catalogName,
         Catalog.Type.RELATIONAL,
-        "generic-lakehouse",
+        "lakehouse-generic",
         "catalog for lance rest service tests",
         properties);
   }
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 4a3a5d468a..98f6df0361 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -32,6 +32,7 @@ include("catalogs:hive-metastore-common")
 include("catalogs:catalog-lakehouse-iceberg")
 include("catalogs:catalog-lakehouse-paimon")
 include("catalogs:catalog-lakehouse-hudi")
+include("catalogs:catalog-lakehouse-generic")
 include(
   "catalogs:catalog-jdbc-common",
   "catalogs:catalog-jdbc-doris",
@@ -51,7 +52,6 @@ include(
   "clients:client-python",
   "clients:cli"
 )
-include("catalogs:catalog-generic-lakehouse")
 if (gradle.startParameter.projectProperties["enableFuse"]?.toBoolean() == 
true) {
   include("clients:filesystem-fuse")
 } else {


Reply via email to