This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by 
this push:
     new d093824b72 [#8838] feat(catalogs): Support create/load/list table 
operation for lance table. (#8879)
d093824b72 is described below

commit d093824b7268aad1735ead114e5e760f28832fea
Author: Mini Yu <[email protected]>
AuthorDate: Fri Oct 24 22:56:03 2025 +0800

    [#8838] feat(catalogs): Support create/load/list table operation for lance 
table. (#8879)
    
    ### What changes were proposed in this pull request?
    
    Add support create and load table operations for lance table.
    
    ### Why are the changes needed?
    
    It's a need.
    
    Fix: #8838
    Fix: #8837
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    Currently, I have only tested it locally
    
    ```shell
    ➜  [/Users/yuqi/Downloads] curl -X POST -H "Accept: 
application/vnd.gravitino.v1+json" \
    -H "Content-Type: application/json" -d '{
      "name": "lance_table14",
      "comment": "This is an example table",
      "columns": [
        {
          "name": "id",
          "type": "integer",
          "comment": "id column comment",
          "nullable": false,
          "autoIncrement": true,
          "defaultValue": {
            "type": "literal",
            "dataType": "integer",
            "value": "-1"
          }
        }
      ],
      "indexes": [
        {
          "indexType": "primary_key",
          "name": "PRIMARY",
          "fieldNames": [["id"]]
        }
      ],
      "properties": {
        "format": "lance",
        "location": "/tmp/lance_catalog/schema/lance_table14"
      }
    }' 
http://localhost:8090/api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables
    {"code":0,"table":{"name":"lance_table14","comment":"This is an example 
table","columns":[{"name":"id","type":"integer","comment":"id column 
comment","nullable":false,"autoIncrement":true,"defaultValue":{"type":"literal","dataType":"integer","value":"-1"}}],"properties":{"format":"lance","location":"/tmp/lance_catalog/schema/lance_table14/"},"audit":{"creator":"anonymous","createTime":"2025-10-23T03:18:39.123151Z"},"distribution":{"strategy":"none","number":0,"funcArgs":[]},"sortOrder
 [...]
    ➜  [/Users/yuqi/Downloads] curl -X GET -H "Accept: 
application/vnd.gravitino.v1+json" \
    -H "Content-Type: application/json" 
http://localhost:8090/api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables/lance_table14
    {"code":0,"table":{"name":"lance_table14","comment":"This is an example 
table","columns":[{"name":"id","type":"integer","comment":"id column 
comment","nullable":false,"autoIncrement":false,"defaultValue":{"type":"literal","dataType":"integer","value":"-1"}}],"properties":{"format":"lance","location":"/tmp/lance_catalog/schema/lance_table14/"},"audit":{"creator":"anonymous","createTime":"2025-10-23T03:18:39.123151Z"},"distribution":{"strategy":"none","number":0,"funcArgs":[]},"sortOrde
 [...]
    -H "Content-Type: application/json" 
http://localhost:8090/api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables
    
{"code":0,"identifiers":[{"namespace":["test","lance_catalog","schema"],"name":"lance_table10"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table11"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table12"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table13"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table14"}]}
    ➜  [/Users/yuqi/Downloads]
    ```
    
    And the lance location
    ```shell
    ➜  [/tmp/lance_catalog/schema] ls
    lance_table10 lance_table11 lance_table12 lance_table13 lance_table14
    ➜  [/tmp/lance_catalog/schema] cd lance_table14
    ➜  [/tmp/lance_catalog/schema/lance_table14] ls -al
    total 0
    drwxr-xr-x@ 4 yuqi  wheel  128 10 23 11:18 .
    drwxr-xr-x@ 7 yuqi  wheel  224 10 23 11:18 ..
    drwxr-xr-x@ 3 yuqi  wheel   96 10 23 11:18 _transactions
    drwxr-xr-x@ 3 yuqi  wheel   96 10 23 11:18 _versions
    ➜  [/tmp/lance_catalog/schema/lance_table14] ls -al _versions
    total 8
    drwxr-xr-x@ 3 yuqi  wheel   96 10 23 11:18 .
    drwxr-xr-x@ 4 yuqi  wheel  128 10 23 11:18 ..
    -rw-r--r--@ 1 yuqi  wheel  225 10 23 11:18 1.manifest
    ➜  [/tmp/lance_catalog/schema/lance_table14]
    ```
---
 api/build.gradle.kts                               |   2 +
 .../org/apache/gravitino/rel/GenericTable.java     |  47 ++++++
 .../org/apache/gravitino/rel/indexes/Indexes.java  |  87 ++++++++++
 .../java/org/apache/gravitino/rel/TestIndex.java   |  57 +++++++
 .../catalog-generic-lakehouse/build.gradle.kts     |   1 +
 .../GenericLakehouseCatalogOperations.java         | 134 ++++++++++++++-
 .../lakehouse/LakehouseCatalogOperations.java      |  25 +++
 .../lakehouse/lance/LanceCatalogOperations.java    | 173 +++++++++++++++++++
 .../lakehouse/lance/LanceDataTypeConverter.java    | 123 ++++++++++++++
 .../apache/gravitino/config/ConfigConstants.java   |   5 +-
 .../catalog/TableOperationDispatcher.java          |  74 +++++++-
 .../connector/GenericLakehouseColumn.java          |  56 +++++++
 .../gravitino/connector/GenericLakehouseTable.java |  86 ++++++++++
 .../apache/gravitino/meta/GenericTableEntity.java  | 186 +++++++++++++++++++++
 .../org/apache/gravitino/meta/TableEntity.java     |  10 +-
 .../relational/mapper/TableVersionMapper.java      |  36 ++++
 .../mapper/TableVersionSQLProviderFactory.java     |  62 +++++++
 .../provider/DefaultMapperPackageProvider.java     |   4 +-
 .../provider/base/TableMetaBaseSQLProvider.java    |  41 +++--
 .../provider/base/TableVersionBaseSQLProvider.java |  79 +++++++++
 .../postgresql/TableVersionPostgreSQLProvider.java |  24 +++
 .../gravitino/storage/relational/po/TablePO.java   |  46 +++++
 .../relational/service/TableMetaService.java       |  16 +-
 .../storage/relational/utils/POConverters.java     |  61 ++++++-
 .../storage/relational/utils/SessionUtils.java     |  12 ++
 scripts/h2/schema-1.1.0-h2.sql                     |  19 ---
 scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql           |  36 ++--
 scripts/mysql/schema-1.1.0-mysql.sql               |  19 ---
 scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql     |  36 ++--
 scripts/postgresql/schema-1.1.0-postgresql.sql     |  19 ---
 .../upgrade-1.0.0-to-1.1.0-postgresql.sql          |  36 ++--
 31 files changed, 1466 insertions(+), 146 deletions(-)

diff --git a/api/build.gradle.kts b/api/build.gradle.kts
index b4399b13c0..f0fe3ba5ee 100644
--- a/api/build.gradle.kts
+++ b/api/build.gradle.kts
@@ -26,6 +26,8 @@ dependencies {
   implementation(libs.commons.lang3)
   implementation(libs.commons.collections4)
   implementation(libs.guava)
+  implementation(libs.jackson.annotations)
+  implementation(libs.jackson.databind)
 
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java 
b/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
new file mode 100644
index 0000000000..4796421c53
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.rel;
+
+/** A generic table interface that extends the Table interface. */
+public interface GenericTable extends Table {
+
+  /**
+   * Formats the table as a string representation.
+   *
+   * @return the formatted string representation of the table
+   */
+  String format();
+
+  /**
+   * Gets the location of the table.
+   *
+   * @return the location of the table
+   */
+  String location();
+
+  /**
+   * Indicates whether the table is external.
+   *
+   * @return true if the table is external, false otherwise
+   */
+  default boolean external() {
+    return false;
+  }
+}
diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java 
b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
index ce10fd0a0f..d1b1a1f523 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
@@ -18,6 +18,22 @@
  */
 package org.apache.gravitino.rel.indexes;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
 /** Helper methods to create index to pass into Apache Gravitino. */
 public class Indexes {
 
@@ -73,10 +89,81 @@ public class Indexes {
         .build();
   }
 
+  /** Custom JSON serializer for Index objects. */
+  public static class IndexSerializer extends JsonSerializer<Index> {
+    @Override
+    public void serialize(Index value, JsonGenerator gen, SerializerProvider 
serializers)
+        throws IOException {
+      gen.writeStartObject();
+      gen.writeStringField("indexType", 
value.type().name().toUpperCase(Locale.ROOT));
+      if (null != value.name()) {
+        gen.writeStringField("name", value.name());
+      }
+      gen.writeFieldName("fieldNames");
+      gen.writeObject(value.fieldNames());
+      gen.writeEndObject();
+    }
+  }
+
+  /** Custom JSON deserializer for Index objects. */
+  public static class IndexDeserializer extends JsonDeserializer<Index> {
+
+    @Override
+    public Index deserialize(JsonParser p, DeserializationContext ctxt) throws 
IOException {
+      JsonNode node = p.getCodec().readTree(p);
+      Preconditions.checkArgument(
+          node != null && !node.isNull() && node.isObject(),
+          "Index must be a valid JSON object, but found: %s",
+          node);
+
+      IndexImpl.Builder builder = IndexImpl.builder();
+      Preconditions.checkArgument(
+          node.has("indexType"), "Cannot parse index from missing type: %s", 
node);
+      String indexType = getString("indexType", node);
+      
builder.withIndexType(Index.IndexType.valueOf(indexType.toUpperCase(Locale.ROOT)));
+      if (node.has("name")) {
+        builder.withName(getString("name", node));
+      }
+      Preconditions.checkArgument(
+          node.has("fieldNames"), "Cannot parse index from missing field 
names: %s", node);
+      List<String[]> fieldNames = Lists.newArrayList();
+      node.get("fieldNames").forEach(field -> 
fieldNames.add(getStringArray((ArrayNode) field)));
+      builder.withFieldNames(fieldNames.toArray(new String[0][0]));
+      return builder.build();
+    }
+
+    private static String[] getStringArray(ArrayNode node) {
+      String[] array = new String[node.size()];
+      for (int i = 0; i < node.size(); i++) {
+        array[i] = node.get(i).asText();
+      }
+      return array;
+    }
+
+    private static String getString(String property, JsonNode node) {
+      Preconditions.checkArgument(node.has(property), "Cannot parse missing 
string: %s", property);
+      JsonNode pNode = node.get(property);
+      return convertToString(property, pNode);
+    }
+
+    private static String convertToString(String property, JsonNode pNode) {
+      Preconditions.checkArgument(
+          pNode != null && !pNode.isNull() && pNode.isTextual(),
+          "Cannot parse to a string value %s: %s",
+          property,
+          pNode);
+      return pNode.asText();
+    }
+  }
+
   /** The user side implementation of the index. */
+  @JsonSerialize(using = IndexSerializer.class)
+  @JsonDeserialize(using = IndexDeserializer.class)
   public static final class IndexImpl implements Index {
     private final IndexType indexType;
+
     private final String name;
+
     private final String[][] fieldNames;
 
     /**
diff --git a/api/src/test/java/org/apache/gravitino/rel/TestIndex.java 
b/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
new file mode 100644
index 0000000000..4a807fbb7b
--- /dev/null
+++ b/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.rel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.cfg.EnumFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestIndex {
+
+  @Test
+  void testIndexSerialization() throws JsonProcessingException {
+    String[][] fields = {{"column1"}, {"column2", "subcolumn"}};
+    Index index = Indexes.unique("test_index", fields);
+
+    JsonMapper jsonMapper =
+        JsonMapper.builder()
+            .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+            .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
+            .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
+            .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+            
.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .build();
+
+    String json = jsonMapper.writeValueAsString(index);
+
+    Index deserializedIndex = jsonMapper.readValue(json, IndexImpl.class);
+    Assertions.assertEquals(index.type(), deserializedIndex.type());
+    Assertions.assertEquals(index.name(), deserializedIndex.name());
+    Assertions.assertArrayEquals(index.fieldNames(), 
deserializedIndex.fieldNames());
+  }
+}
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts 
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index fceac14304..704dbda7e3 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,6 +43,7 @@ dependencies {
   implementation(libs.commons.lang3)
   implementation(libs.guava)
   implementation(libs.hadoop3.client.api)
+  implementation(libs.lance)
 
   annotationProcessor(libs.lombok)
 
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index b626aabc16..acac35528e 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -18,11 +18,17 @@
  */
 package org.apache.gravitino.catalog.lakehouse;
 
+import static org.apache.gravitino.Entity.EntityType.TABLE;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
@@ -30,16 +36,20 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.SupportsSchemas;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
@@ -61,6 +71,11 @@ public class GenericLakehouseCatalogOperations
   @SuppressWarnings("unused") // todo: remove this after implementing table 
operations
   private Optional<Path> catalogLakehouseDir;
 
+  private static final Map<String, LakehouseCatalogOperations> 
SUPPORTED_FORMATS =
+      Maps.newHashMap();
+
+  private CatalogInfo catalogInfo;
+  private HasPropertyMetadata propertiesMetadata;
   /**
    * Initializes the generic lakehouse catalog operations with the provided 
configuration.
    *
@@ -141,7 +156,25 @@ public class GenericLakehouseCatalogOperations
 
   @Override
   public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    NameIdentifier identifier = NameIdentifier.of(namespace.levels());
+    try {
+      store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
+    } catch (NoSuchTableException e) {
+      throw new NoSuchEntityException(e, "Schema %s does not exist", 
namespace);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to get schema " + identifier);
+    }
+
+    try {
+      List<GenericTableEntity> tableEntityList =
+          store.list(namespace, GenericTableEntity.class, TABLE);
+      return tableEntityList.stream()
+          .map(e -> NameIdentifier.of(namespace, e.name()))
+          .toArray(NameIdentifier[]::new);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
+    }
   }
 
   @Override
@@ -160,7 +193,66 @@ public class GenericLakehouseCatalogOperations
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    String format = properties.getOrDefault("format", "lance");
+    String tableLocation = calculateTableLocation(ident, properties);
+    Map<String, String> newProperties = Maps.newHashMap(properties);
+    newProperties.put("location", tableLocation);
+
+    LakehouseCatalogOperations lakehouseCatalogOperations =
+        SUPPORTED_FORMATS.compute(
+            format,
+            (k, v) ->
+                v == null
+                    ? createLakehouseCatalogOperations(
+                        format, properties, catalogInfo, propertiesMetadata)
+                    : v);
+
+    return lakehouseCatalogOperations.createTable(
+        ident, columns, comment, newProperties, partitions, distribution, 
sortOrders, indexes);
+  }
+
+  private String calculateTableLocation(
+      NameIdentifier tableIdent, Map<String, String> tableProperties) {
+    String tableLocation = tableProperties.get("location");
+    if (StringUtils.isNotBlank(tableLocation)) {
+      return ensureTrailingSlash(tableLocation);
+    }
+
+    String schemaLocation;
+    try {
+      Schema schema = 
loadSchema(NameIdentifier.of(tableIdent.namespace().levels()));
+      schemaLocation = schema.properties().get("location");
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(
+          String.format(
+              "Failed to load schema for table %s to determine default 
location.", tableIdent),
+          e);
+    }
+
+    // If we do not set location in table properties, and schema location is 
set, use schema
+    // location
+    // as the base path.
+    if (StringUtils.isNotBlank(schemaLocation)) {
+      return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
+    }
+
+    // If the schema location is not set, use catalog lakehouse dir as the 
base path. Or else, throw
+    // an exception.
+    if (catalogLakehouseDir.isEmpty()) {
+      throw new RuntimeException(
+          String.format(
+              "No location specified for table %s, you need to set location 
either in catalog, schema, or table properties",
+              tableIdent));
+    }
+
+    String catalogLakehousePath = catalogLakehouseDir.get().toString();
+    String[] nsLevels = tableIdent.namespace().levels();
+    String schemaName = nsLevels[nsLevels.length - 1];
+    return ensureTrailingSlash(catalogLakehousePath)
+        + schemaName
+        + SLASH
+        + tableIdent.name()
+        + SLASH;
   }
 
   @Override
@@ -171,10 +263,46 @@ public class GenericLakehouseCatalogOperations
 
   @Override
   public boolean dropTable(NameIdentifier ident) {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    Namespace namespace = ident.namespace();
+    try {
+      GenericTableEntity tableEntity =
+          store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+      Map<String, String> tableProperties = tableEntity.getProperties();
+      String format = tableProperties.getOrDefault("format", "lance");
+      LakehouseCatalogOperations lakehouseCatalogOperations =
+          SUPPORTED_FORMATS.compute(
+              format,
+              (k, v) ->
+                  v == null
+                      ? createLakehouseCatalogOperations(
+                          format, tableProperties, catalogInfo, 
propertiesMetadata)
+                      : v);
+      return lakehouseCatalogOperations.dropTable(ident);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
+    }
   }
 
   private String ensureTrailingSlash(String path) {
     return path.endsWith(SLASH) ? path : path + SLASH;
   }
+
+  private LakehouseCatalogOperations createLakehouseCatalogOperations(
+      String format,
+      Map<String, String> properties,
+      CatalogInfo catalogInfo,
+      HasPropertyMetadata propertiesMetadata) {
+    LakehouseCatalogOperations operations;
+    switch (format.toLowerCase()) {
+      case "lance":
+        operations = new LanceCatalogOperations();
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported lakehouse format: 
" + format);
+    }
+
+    operations.initialize(properties, catalogInfo, propertiesMetadata);
+    return operations;
+  }
 }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
new file mode 100644
index 0000000000..66c7147626
--- /dev/null
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
@@ -0,0 +1,25 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse;
+
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.rel.TableCatalog;
+
+public interface LakehouseCatalogOperations extends CatalogOperations, 
TableCatalog {}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
new file mode 100644
index 0000000000..3e1146b7ad
--- /dev/null
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.collect.ImmutableMap;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.GenericLakehouseTable;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class LanceCatalogOperations implements LakehouseCatalogOperations {
+
+  private Map<String, String> lancePropertiesMap;
+
+  @Override
+  public void initialize(
+      Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
+      throws RuntimeException {
+    lancePropertiesMap = ImmutableMap.copyOf(config);
+  }
+
+  @Override
+  public void testConnection(
+      NameIdentifier catalogIdent,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties)
+      throws Exception {}
+
+  @Override
+  public void close() throws IOException {}
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    return new NameIdentifier[0];
+  }
+
+  @Override
+  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    // Should not come here.
+    return null;
+  }
+
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    // Ignore partitions, distributions, sortOrders, and indexes for Lance 
tables;
+    String location = properties.get("location");
+    try (Dataset dataset =
+        Dataset.create(
+            new RootAllocator(),
+            location,
+            convertColumnsToSchema(columns),
+            new WriteParams.Builder().build())) {
+      GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
+      return builder
+          .withName(ident.name())
+          .withColumns(columns)
+          .withComment(comment)
+          .withProperties(properties)
+          .withDistribution(distribution)
+          .withIndexes(indexes)
+          .withAuditInfo(
+              AuditInfo.builder()
+                  .withCreator(PrincipalUtils.getCurrentUserName())
+                  .withCreateTime(Instant.now())
+                  .build())
+          .withPartitioning(partitions)
+          .withSortOrders(sortOrders)
+          .withFormat("lance")
+          .build();
+    }
+  }
+
+  private org.apache.arrow.vector.types.pojo.Schema 
convertColumnsToSchema(Column[] columns) {
+    LanceDataTypeConverter converter = new LanceDataTypeConverter();
+    List<Field> fields =
+        Arrays.stream(columns)
+            .map(
+                col -> {
+                  boolean nullable = col.nullable();
+                  if (nullable) {
+                    return new org.apache.arrow.vector.types.pojo.Field(
+                        col.name(),
+                        org.apache.arrow.vector.types.pojo.FieldType.nullable(
+                            converter.fromGravitino(col.dataType())),
+                        null);
+                  }
+
+                  // not nullable
+                  return new org.apache.arrow.vector.types.pojo.Field(
+                      col.name(),
+                      org.apache.arrow.vector.types.pojo.FieldType.notNullable(
+                          converter.fromGravitino(col.dataType())),
+                      null);
+                })
+            .collect(Collectors.toList());
+    return new org.apache.arrow.vector.types.pojo.Schema(fields);
+  }
+
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    // Use another PRs to implement alter table for Lance tables
+    return null;
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    try {
+      String location = lancePropertiesMap.get("location");
+      // Remove the directory on storage
+      FileSystem fs = FileSystem.get(new Configuration());
+      return fs.delete(new Path(location), true);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to drop Lance table: " + 
ident.name(), e);
+    }
+  }
+}
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
new file mode 100644
index 0000000000..117863659e
--- /dev/null
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
@@ -0,0 +1,123 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.gravitino.connector.DataTypeConverter;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.FixedType;
+
+public class LanceDataTypeConverter implements DataTypeConverter<ArrowType, 
ArrowType> {
+
+  @Override
+  public ArrowType fromGravitino(Type type) {
+    switch (type.name()) {
+      case BOOLEAN:
+        return Bool.INSTANCE;
+      case BYTE:
+        return new Int(8, true);
+      case SHORT:
+        return new Int(16, true);
+      case INTEGER:
+        return new Int(32, true);
+      case LONG:
+        return new Int(64, true);
+      case FLOAT:
+        return new FloatingPoint(FloatingPointPrecision.SINGLE);
+      case DOUBLE:
+        return new FloatingPoint(FloatingPointPrecision.DOUBLE);
+      case DECIMAL:
+        // Lance uses FIXED_SIZE_BINARY for decimal types
+        return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for 
decimal
+      case DATE:
+        return new ArrowType.Date(DateUnit.DAY);
+      case TIME:
+        return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+      case TIMESTAMP:
+        return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+      case VARCHAR:
+      case STRING:
+        return new ArrowType.Utf8();
+      case FIXED:
+        FixedType fixedType = (FixedType) type;
+        return new ArrowType.FixedSizeBinary(fixedType.length());
+      case BINARY:
+        return new ArrowType.Binary();
+      default:
+        throw new UnsupportedOperationException("Unsupported Gravitino type: " 
+ type.name());
+    }
+  }
+
+  @Override
+  public Type toGravitino(ArrowType arrowType) {
+    if (arrowType instanceof Bool) {
+      return Types.BooleanType.get();
+    } else if (arrowType instanceof Int intType) {
+      switch (intType.getBitWidth()) {
+        case 8 -> {
+          return Types.ByteType.get();
+        }
+        case 16 -> {
+          return Types.ShortType.get();
+        }
+        case 32 -> {
+          return Types.IntegerType.get();
+        }
+        case 64 -> {
+          return Types.LongType.get();
+        }
+        default -> throw new UnsupportedOperationException(
+            "Unsupported Int bit width: " + intType.getBitWidth());
+      }
+    } else if (arrowType instanceof FloatingPoint floatingPoint) {
+      switch (floatingPoint.getPrecision()) {
+        case SINGLE:
+          return Types.FloatType.get();
+        case DOUBLE:
+          return Types.DoubleType.get();
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported FloatingPoint precision: " + 
floatingPoint.getPrecision());
+      }
+    } else if (arrowType instanceof ArrowType.FixedSizeBinary) {
+      ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) 
arrowType;
+      return Types.FixedType.of(fixedSizeBinary.getByteWidth());
+    } else if (arrowType instanceof ArrowType.Date) {
+      return Types.DateType.get();
+    } else if (arrowType instanceof ArrowType.Time) {
+      return Types.TimeType.get();
+    } else if (arrowType instanceof ArrowType.Timestamp) {
+      return Types.TimestampType.withoutTimeZone();
+    } else if (arrowType instanceof ArrowType.Utf8) {
+      return Types.StringType.get();
+    } else if (arrowType instanceof ArrowType.Binary) {
+      return Types.BinaryType.get();
+    } else {
+      throw new UnsupportedOperationException("Unsupported Arrow type: " + 
arrowType);
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java 
b/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index 17c08ac0e9..0ef761a6ca 100644
--- a/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -80,6 +80,9 @@ public final class ConfigConstants {
   /** The version number for the 1.0.0 release. */
   public static final String VERSION_1_0_0 = "1.0.0";
 
+  /** The version number for the 1.1.0 release. */
+  public static final String VERSION_1_1_0 = "1.1.0";
+
   /** The current version of backend storage initialization script. */
-  public static final String CURRENT_SCRIPT_VERSION = VERSION_1_0_0;
+  public static final String CURRENT_SCRIPT_VERSION = VERSION_1_1_0;
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index a777f2a511..1cbab0d6ed 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -27,6 +27,7 @@ import static 
org.apache.gravitino.utils.NameIdentifierUtil.getSchemaIdentifier;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
+import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,6 +37,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
@@ -52,8 +54,10 @@ import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.GenericTable;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -487,6 +491,19 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
   }
 
   private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    if (isGenericLakehouseCatalog(catalogIdent)) {
+      try {
+        GenericTableEntity tableEntity = store.get(ident, TABLE, 
GenericTableEntity.class);
+        if (tableEntity != null) {
+          GenericTable genericTable = tableEntity.toGenericTable();
+          return EntityCombinedTable.of(genericTable).withImported(true);
+        }
+      } catch (IOException ioe) {
+        throw new RuntimeException("Failed to load table entity " + ident, 
ioe);
+      }
+    }
+
     NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
     Table table =
         doWithCatalog(
@@ -597,18 +614,46 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             .mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, 
idGenerator.nextId(), audit))
             .collect(Collectors.toList());
 
-    TableEntity tableEntity =
-        TableEntity.builder()
-            .withId(uid)
-            .withName(ident.name())
-            .withNamespace(ident.namespace())
-            .withColumns(columnEntityList)
-            .withAuditInfo(audit)
-            .build();
+    TableEntity tableEntity;
+    if (isGenericLakehouseCatalog(catalogIdent)) {
+      // For generic lakehouse catalog, we only create the table entity with 
basic info.
+      GenericTable genericTable = (GenericTable) table;
+      tableEntity =
+          GenericTableEntity.getBuilder()
+              .withId(uid)
+              .withName(ident.name())
+              .withNamespace(ident.namespace())
+              .withFormat(genericTable.format())
+              .withAuditInfo(audit)
+              .withColumns(columnEntityList)
+              .withIndexes(table.index())
+              .withDistribution(table.distribution())
+              .withFormat(genericTable.format())
+              .withPartitions(table.partitioning())
+              .withSortOrder(table.sortOrder())
+              .withProperties(genericTable.properties())
+              .withComment(genericTable.comment())
+              .build();
+    } else {
+      tableEntity =
+          TableEntity.builder()
+              .withId(uid)
+              .withName(ident.name())
+              .withNamespace(ident.namespace())
+              .withColumns(columnEntityList)
+              .withAuditInfo(audit)
+              .build();
+    }
 
     try {
       store.put(tableEntity, true /* overwrite */);
     } catch (Exception e) {
+      if (isGenericLakehouseCatalog(catalogIdent)) {
+        // Drop table
+        doWithCatalog(
+            catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)), 
RuntimeException.class);
+      }
+
       LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
       return EntityCombinedTable.of(table)
           .withHiddenProperties(
@@ -616,6 +661,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                   catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
     }
 
+    // For managed table, we can use table entity to indicate the table is 
created successfully.
     return EntityCombinedTable.of(table, tableEntity)
         .withHiddenProperties(
             getHiddenPropertyNames(
@@ -630,6 +676,18 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             .collect(Collectors.toList());
   }
 
+  private boolean isGenericLakehouseCatalog(NameIdentifier catalogIdent) {
+    CatalogManager catalogManager = 
GravitinoEnv.getInstance().catalogManager();
+    try {
+      Catalog catalog = catalogManager.loadCatalog(catalogIdent);
+      return catalog.type() == Catalog.Type.RELATIONAL
+          && catalog.provider().equals("generic-lakehouse");
+    } catch (NoSuchEntityException e) {
+      LOG.warn("Catalog not found: {}", catalogIdent, e);
+      return false;
+    }
+  }
+
   private boolean isSameColumn(Column left, int columnPosition, ColumnEntity 
right) {
     return Objects.equal(left.name(), right.name())
         && columnPosition == right.position()
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
new file mode 100644
index 0000000000..b84b265256
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.tag.SupportsTags;
+
+public class GenericLakehouseColumn extends BaseColumn {
+  @Override
+  public SupportsTags supportsTags() {
+    return super.supportsTags();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends BaseColumnBuilder<Builder, 
GenericLakehouseColumn> {
+
+    /** Creates a new instance of {@link Builder}. */
+    private Builder() {}
+
+    /**
+     * Internal method to build a HiveColumn instance using the provided 
values.
+     *
+     * @return A new HiveColumn instance with the configured values.
+     */
+    @Override
+    protected GenericLakehouseColumn internalBuild() {
+      GenericLakehouseColumn hiveColumn = new GenericLakehouseColumn();
+
+      hiveColumn.name = name;
+      hiveColumn.comment = comment;
+      hiveColumn.dataType = dataType;
+      hiveColumn.nullable = nullable;
+      hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : 
defaultValue;
+      return hiveColumn;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
new file mode 100644
index 0000000000..a9379a5b31
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.rel.GenericTable;
+
+public class GenericLakehouseTable extends BaseTable implements GenericTable {
+  @SuppressWarnings("unused")
+  private String schemaName;
+
+  private String format;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public String format() {
+    return format;
+  }
+
+  @Override
+  public String location() {
+    return properties.get("location");
+  }
+
+  @Override
+  public boolean external() {
+    return properties.get("external") != null && 
Boolean.parseBoolean(properties.get("external"));
+  }
+
+  @Override
+  protected TableOperations newOps() throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  public static class Builder extends BaseTableBuilder<Builder, 
GenericLakehouseTable> {
+
+    private String schemaName;
+    private String format;
+
+    public Builder withSchemaName(String schemaName) {
+      this.schemaName = schemaName;
+      return this;
+    }
+
+    public Builder withFormat(String format) {
+      this.format = format;
+      return this;
+    }
+
+    @Override
+    protected GenericLakehouseTable internalBuild() {
+      GenericLakehouseTable genericLakehouseTable = new 
GenericLakehouseTable();
+      genericLakehouseTable.schemaName = this.schemaName;
+      genericLakehouseTable.format = this.format;
+      genericLakehouseTable.columns = this.columns;
+      genericLakehouseTable.comment = this.comment;
+      genericLakehouseTable.properties = this.properties;
+      genericLakehouseTable.auditInfo = this.auditInfo;
+      genericLakehouseTable.distribution = this.distribution;
+      genericLakehouseTable.indexes = this.indexes;
+      genericLakehouseTable.name = this.name;
+      genericLakehouseTable.partitioning = this.partitioning;
+      genericLakehouseTable.sortOrders = this.sortOrders;
+      return genericLakehouseTable;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
new file mode 100644
index 0000000000..4b2dd9ad03
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.connector.GenericLakehouseColumn;
+import org.apache.gravitino.connector.GenericLakehouseTable;
+import org.apache.gravitino.rel.GenericTable;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+
+@Getter
+public class GenericTableEntity extends TableEntity {
+  public static final Field FORMAT = Field.required("format", Long.class, "The 
table's format");
+  public static final Field PROPERTIES =
+      Field.optional("properties", Map.class, "The table's properties");
+
+  public static final Field PARTITIONS =
+      Field.optional("partitions", Transform[].class, "The table's partition");
+
+  public static final Field SORT_ORDER =
+      Field.optional("sortOrders", SortOrder[].class, "The table's sort 
order");
+
+  public static final Field DISTRIBUTION =
+      Field.optional("distribution", Distribution.class, "The table's 
distribution");
+
+  public static final Field INDEXES =
+      Field.optional("indexes", Index[].class, "The table's indexes");
+
+  public static final Field COMMENT =
+      Field.optional("comment", String.class, "The table's comment");
+
+  public GenericTableEntity() {
+    super();
+  }
+
+  @Override
+  public Map<Field, Object> fields() {
+    Map<Field, Object> superFields = super.fields();
+    Map<Field, Object> result = Maps.newHashMap(superFields);
+    result.put(FORMAT, format);
+    result.put(PROPERTIES, properties);
+    result.put(PARTITIONS, partitions);
+    result.put(SORT_ORDER, sortOrder);
+    result.put(DISTRIBUTION, distribution);
+    result.put(INDEXES, indexes);
+    result.put(COMMENT, comment);
+
+    return result;
+  }
+
+  private String format;
+  @Getter private Map<String, String> properties;
+  private Transform[] partitions;
+  private SortOrder[] sortOrder;
+  private Distribution distribution;
+  private Index[] indexes;
+  private String comment;
+
+  public static class Builder {
+    private final GenericTableEntity tableEntity;
+
+    public Builder() {
+      this.tableEntity = new GenericTableEntity();
+    }
+
+    public Builder withId(Long id) {
+      tableEntity.id = id;
+      return this;
+    }
+
+    public Builder withName(String name) {
+      tableEntity.name = name;
+      return this;
+    }
+
+    public Builder withAuditInfo(AuditInfo auditInfo) {
+      tableEntity.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withColumns(java.util.List<ColumnEntity> columns) {
+      tableEntity.columns = columns;
+      return this;
+    }
+
+    public Builder withNamespace(org.apache.gravitino.Namespace namespace) {
+      tableEntity.namespace = namespace;
+      return this;
+    }
+
+    public Builder withFormat(String format) {
+      tableEntity.format = format;
+      return this;
+    }
+
+    public Builder withProperties(Map<String, String> properties) {
+      tableEntity.properties = properties;
+      return this;
+    }
+
+    public Builder withPartitions(Transform[] partitions) {
+      tableEntity.partitions = partitions;
+      return this;
+    }
+
+    public Builder withSortOrder(SortOrder[] sortOrder) {
+      tableEntity.sortOrder = sortOrder;
+      return this;
+    }
+
+    public Builder withDistribution(Distribution distribution) {
+      tableEntity.distribution = distribution;
+      return this;
+    }
+
+    public Builder withIndexes(Index[] indexes) {
+      tableEntity.indexes = indexes;
+      return this;
+    }
+
+    public Builder withComment(String comment) {
+      tableEntity.comment = comment;
+      return this;
+    }
+
+    public GenericTableEntity build() {
+      return tableEntity;
+    }
+  }
+
+  public static GenericTableEntity.Builder getBuilder() {
+    return new GenericTableEntity.Builder();
+  }
+
+  public GenericTable toGenericTable() {
+    return GenericLakehouseTable.builder()
+        .withFormat(format)
+        .withProperties(properties)
+        .withAuditInfo(auditInfo)
+        .withSortOrders(sortOrder)
+        .withPartitioning(partitions)
+        .withDistribution(distribution)
+        .withColumns(
+            columns.stream()
+                .map(this::toGenericLakehouseColumn)
+                .toArray(GenericLakehouseColumn[]::new))
+        .withIndexes(indexes)
+        .withName(name)
+        .withComment(comment)
+        .build();
+  }
+
+  private GenericLakehouseColumn toGenericLakehouseColumn(ColumnEntity 
columnEntity) {
+    return GenericLakehouseColumn.builder()
+        .withName(columnEntity.name())
+        .withComment(columnEntity.comment())
+        .withAutoIncrement(columnEntity.autoIncrement())
+        .withNullable(columnEntity.nullable())
+        .withType(columnEntity.dataType())
+        .withDefaultValue(columnEntity.defaultValue())
+        .build();
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
index 9d15be7df6..595defed08 100644
--- a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
@@ -42,15 +42,15 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
   public static final Field COLUMNS =
       Field.optional("columns", List.class, "The columns of the table");
 
-  private Long id;
+  protected Long id;
 
-  private String name;
+  protected String name;
 
-  private AuditInfo auditInfo;
+  protected AuditInfo auditInfo;
 
-  private Namespace namespace;
+  protected Namespace namespace;
 
-  private List<ColumnEntity> columns;
+  protected List<ColumnEntity> columns;
 
   /**
    * Returns a map of the fields and their corresponding values for this table.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
new file mode 100644
index 0000000000..a723c3db4a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+
+public interface TableVersionMapper {
+  String TABLE_NAME = "table_version_info";
+
+  @InsertProvider(type = TableVersionSQLProviderFactory.class, method = 
"insertTableVersion")
+  void insertTableVersion(@Param("tablePO") TablePO tablePO);
+
+  @InsertProvider(
+      type = TableVersionSQLProviderFactory.class,
+      method = "insertTableVersionOnDuplicateKeyUpdate")
+  void insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO 
tablePO);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
new file mode 100644
index 0000000000..ab27353c00
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.TableVersionPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionSQLProviderFactory {
+
+  private static final Map<JDBCBackendType, TableVersionBaseSQLProvider>
+      TABLE_VERSION_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new 
TableVersionSQLProviderFactory.TableVersionMySQLProvider(),
+              JDBCBackendType.H2, new 
TableVersionSQLProviderFactory.TableVersionH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
TableVersionPostgreSQLProvider());
+
+  public static TableVersionBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return TABLE_VERSION_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  static class TableVersionMySQLProvider extends TableVersionBaseSQLProvider {}
+
+  static class TableVersionH2Provider extends TableVersionBaseSQLProvider {}
+
+  public static String insertTableVersion(@Param("tablePO") TablePO tablePO) {
+    return getProvider().insertTableVersion(tablePO);
+  }
+
+  public static String 
insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO tablePO) {
+    return getProvider().insertTableVersionOnDuplicateKeyUpdate(tablePO);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
index f214bd1962..aaf22ccda8 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
@@ -41,6 +41,7 @@ import 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
@@ -78,6 +79,7 @@ public class DefaultMapperPackageProvider implements 
MapperPackageProvider {
         TagMetaMapper.class,
         TopicMetaMapper.class,
         UserMetaMapper.class,
-        UserRoleRelMapper.class);
+        UserRoleRelMapper.class,
+        TableVersionMapper.class);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 9360e2c354..8065476a61 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -21,20 +21,29 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.ibatis.annotations.Param;
 
 public class TableMetaBaseSQLProvider {
 
   public String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) {
-    return "SELECT table_id as tableId, table_name as tableName,"
-        + " metalake_id as metalakeId, catalog_id as catalogId,"
-        + " schema_id as schemaId, audit_info as auditInfo,"
-        + " current_version as currentVersion, last_version as lastVersion,"
-        + " deleted_at as deletedAt"
+    return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+        + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+        + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+        + " tm.current_version as currentVersion, tm.last_version as 
lastVersion,"
+        + " tm.deleted_at as deletedAt,"
+        + " tv.format as format, "
+        + " tv.properties as properties,"
+        + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+        + " tv.distribution as distribution, tv.indexes as indexes,"
+        + " tv.comment as comment"
         + " FROM "
         + TABLE_NAME
-        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+        + " tm LEFT JOIN "
+        + TableVersionMapper.TABLE_NAME
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version AND tv.deleted_at = 0"
+        + " WHERE tm.schema_id = #{schemaId} AND tm.deleted_at = 0";
   }
 
   public String listTablePOsByTableIds(List<Long> tableIds) {
@@ -65,14 +74,22 @@ public class TableMetaBaseSQLProvider {
 
   public String selectTableMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name) {
-    return "SELECT table_id as tableId, table_name as tableName,"
-        + " metalake_id as metalakeId, catalog_id as catalogId,"
-        + " schema_id as schemaId, audit_info as auditInfo,"
-        + " current_version as currentVersion, last_version as lastVersion,"
-        + " deleted_at as deletedAt"
+    return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+        + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+        + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+        + " tm.current_version as currentVersion, tm.last_version as 
lastVersion,"
+        + " tm.deleted_at as deletedAt,"
+        + " tv.format as format, "
+        + " tv.properties as properties,"
+        + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+        + " tv.distribution as distribution, tv.indexes as indexes,"
+        + " tv.comment as comment"
         + " FROM "
         + TABLE_NAME
-        + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND 
deleted_at = 0";
+        + " tm LEFT JOIN "
+        + TableVersionMapper.TABLE_NAME
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version AND tv.deleted_at = 0"
+        + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName} 
AND tm.deleted_at = 0";
   }
 
   public String selectTableMetaById(@Param("tableId") Long tableId) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
new file mode 100644
index 0000000000..3501abe10c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static 
org.apache.gravitino.storage.relational.mapper.TableVersionMapper.TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionBaseSQLProvider {
+
+  public String insertTableVersion(@Param("tablePO") TablePO tablePO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + " (table_id, format, properties, partitioning"
+        + " distribution, sort_orders, indexes, comment,"
+        + " version, last_version, deleted_at)"
+        + " VALUES ("
+        + " #{tablePO.tableId},"
+        + " #{tablePO.format},"
+        + " #{tablePO.properties},"
+        + " #{tablePO.partitions},"
+        + " #{tablePO.distribution},"
+        + " #{tablePO.sortOrders},"
+        + " #{tablePO.indexes},"
+        + " #{tablePO.comment},"
+        + " #{tablePO.currentVersion},"
+        + " #{tablePO.lastVersion},"
+        + " #{tablePO.deletedAt}"
+        + " )";
+  }
+
+  public String insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") 
TablePO tablePO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + " (table_id, format, properties, partitioning,"
+        + " distribution, sort_orders, indexes, comment,"
+        + " version, deleted_at)"
+        + " VALUES ("
+        + " #{tablePO.tableId},"
+        + " #{tablePO.format},"
+        + " #{tablePO.properties},"
+        + " #{tablePO.partitions},"
+        + " #{tablePO.distribution},"
+        + " #{tablePO.sortOrders},"
+        + " #{tablePO.indexes},"
+        + " #{tablePO.comment},"
+        + " #{tablePO.currentVersion},"
+        + " #{tablePO.deletedAt}"
+        + " )"
+        + " ON DUPLICATE KEY UPDATE"
+        + " format = #{tablePO.format},"
+        + " properties = #{tablePO.properties},"
+        + " partitioning = #{tablePO.partitions},"
+        + " distribution = #{tablePO.distribution},"
+        + " sort_orders = #{tablePO.sortOrders},"
+        + " indexes = #{tablePO.indexes},"
+        + " comment = #{tablePO.comment},"
+        + " version = #{tablePO.currentVersion},"
+        + " deleted_at = #{tablePO.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
new file mode 100644
index 0000000000..e0a7413b1c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -0,0 +1,24 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+
+public class TableVersionPostgreSQLProvider extends 
TableVersionBaseSQLProvider {}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
index 693105e772..56fea38337 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
@@ -20,7 +20,9 @@ package org.apache.gravitino.storage.relational.po;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import lombok.Getter;
 
+@Getter
 public class TablePO {
   private Long tableId;
   private String tableName;
@@ -32,6 +34,15 @@ public class TablePO {
   private Long lastVersion;
   private Long deletedAt;
 
+  private String format;
+
+  private String properties;
+  private String partitions;
+  private String sortOrders;
+  private String distribution;
+  private String indexes;
+  private String comment;
+
   public Long getTableId() {
     return tableId;
   }
@@ -154,6 +165,41 @@ public class TablePO {
       return this;
     }
 
+    public Builder withFormat(String format) {
+      tablePO.format = format;
+      return this;
+    }
+
+    public Builder withProperties(String properties) {
+      tablePO.properties = properties;
+      return this;
+    }
+
+    public Builder withPartitions(String partitions) {
+      tablePO.partitions = partitions;
+      return this;
+    }
+
+    public Builder withSortOrders(String sortOrders) {
+      tablePO.sortOrders = sortOrders;
+      return this;
+    }
+
+    public Builder withDistribution(String distribution) {
+      tablePO.distribution = distribution;
+      return this;
+    }
+
+    public Builder withIndexes(String indexes) {
+      tablePO.indexes = indexes;
+      return this;
+    }
+
+    public Builder withComment(String comment) {
+      tablePO.comment = comment;
+      return this;
+    }
+
     private void validate() {
       Preconditions.checkArgument(tablePO.tableId != null, "Table id is 
required");
       Preconditions.checkArgument(tablePO.tableName != null, "Table name is 
required");
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 326ba63b5f..f4bbf7a6f6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -40,6 +40,7 @@ import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMap
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
@@ -118,12 +119,12 @@ public class TableMetaService {
       fillTablePOBuilderParentEntityId(builder, tableEntity.namespace());
 
       AtomicReference<TablePO> tablePORef = new AtomicReference<>();
+      TablePO po = POConverters.initializeTablePOWithVersion(tableEntity, 
builder);
       SessionUtils.doMultipleWithCommit(
           () ->
               SessionUtils.doWithoutCommit(
                   TableMetaMapper.class,
                   mapper -> {
-                    TablePO po = 
POConverters.initializeTablePOWithVersion(tableEntity, builder);
                     tablePORef.set(po);
                     if (overwrite) {
                       mapper.insertTableMetaOnDuplicateKeyUpdate(po);
@@ -131,6 +132,18 @@ public class TableMetaService {
                       mapper.insertTableMeta(po);
                     }
                   }),
+          () ->
+              SessionUtils.doWithCommit(
+                  TableVersionMapper.class,
+                  mapper -> {
+                    if (po.getFormat() != null) {
+                      if (overwrite) {
+                        mapper.insertTableVersionOnDuplicateKeyUpdate(po);
+                      } else {
+                        mapper.insertTableVersion(po);
+                      }
+                    }
+                  }),
           () -> {
             // We need to delete the columns first if we want to overwrite the 
table.
             if (overwrite) {
@@ -292,7 +305,6 @@ public class TableMetaService {
         SessionUtils.getWithoutCommit(
             TableMetaMapper.class,
             mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, 
tableName));
-
     if (tablePO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 127cb022e8..62bc11f891 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -45,6 +45,7 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
@@ -60,6 +61,7 @@ import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.PolicyContent;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
@@ -390,14 +392,44 @@ public class POConverters {
   public static TablePO initializeTablePOWithVersion(
       TableEntity tableEntity, TablePO.Builder builder) {
     try {
-      return builder
+      builder
           .withTableId(tableEntity.id())
           .withTableName(tableEntity.name())
           
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.auditInfo()))
           .withCurrentVersion(INIT_VERSION)
           .withLastVersion(INIT_VERSION)
-          .withDeletedAt(DEFAULT_DELETED_AT)
-          .build();
+          .withDeletedAt(DEFAULT_DELETED_AT);
+
+      if (tableEntity instanceof GenericTableEntity genericTable) {
+        builder.withFormat(genericTable.getFormat());
+        builder.withComment(genericTable.getComment());
+        builder.withProperties(
+            genericTable.getProperties() == null
+                ? null
+                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+
+        // TODO store the following information to databases;
+        /**
+         * builder.withDistribution( genericTable.getDistribution() == null ? 
null :
+         * 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getDistribution()));
+         * builder.withPartitions( genericTable.getPartitions() == null ? null 
:
+         * 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getPartitions()));
+         */
+        builder.withIndexes(
+            genericTable.getIndexes() == null
+                ? null
+                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
+        builder.withProperties(
+            genericTable.getProperties() == null
+                ? null
+                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+        builder.withSortOrders(
+            genericTable.getSortOrder() == null
+                ? null
+                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getSortOrder()));
+      }
+
+      return builder.build();
     } catch (JsonProcessingException e) {
       throw new RuntimeException("Failed to serialize json object:", e);
     }
@@ -455,6 +487,29 @@ public class POConverters {
   public static TableEntity fromTableAndColumnPOs(
       TablePO tablePO, List<ColumnPO> columnPOs, Namespace namespace) {
     try {
+      if (tablePO.getFormat() != null) {
+        return GenericTableEntity.getBuilder()
+            .withId(tablePO.getTableId())
+            .withName(tablePO.getTableName())
+            .withNamespace(namespace)
+            .withColumns(fromColumnPOs(columnPOs))
+            .withAuditInfo(
+                JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(), 
AuditInfo.class))
+            // TODO add field partition, distribution and sort order;
+            .withIndexes(
+                StringUtils.isBlank(tablePO.getIndexes())
+                    ? null
+                    : 
JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(), IndexImpl[].class))
+            .withFormat(tablePO.getFormat())
+            .withComment(tablePO.getComment())
+            .withProperties(
+                StringUtils.isBlank(tablePO.getProperties())
+                    ? null
+                    : 
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
+            .withColumns(fromColumnPOs(columnPOs))
+            .build();
+      }
+
       return TableEntity.builder()
           .withId(tablePO.getTableId())
           .withName(tablePO.getTableName())
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
index 752d89533d..0482bfecfd 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
@@ -106,4 +106,16 @@ public class SessionUtils {
       throw e;
     }
   }
+
+  public static void beginTransaction() {
+    SqlSessions.getSqlSession();
+  }
+
+  public static void commitTransaction() {
+    SqlSessions.commitAndCloseSqlSession();
+  }
+
+  public static void rollbackTransaction() {
+    SqlSessions.rollbackAndCloseSqlSession();
+  }
 }
diff --git a/scripts/h2/schema-1.1.0-h2.sql b/scripts/h2/schema-1.1.0-h2.sql
index 98a1217423..6172915f1f 100644
--- a/scripts/h2/schema-1.1.0-h2.sql
+++ b/scripts/h2/schema-1.1.0-h2.sql
@@ -1,22 +1,3 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
 --
 -- Licensed to the Apache Software Foundation (ASF) under one
 -- or more contributor license agreements.  See the NOTICE file--
diff --git a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql 
b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
index f76a2c2593..cf42a02b57 100644
--- a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
+++ b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
@@ -1,21 +1,21 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file--
+--  distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
 
 CREATE TABLE IF NOT EXISTS `table_version_info` (
     `table_id`        BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
diff --git a/scripts/mysql/schema-1.1.0-mysql.sql 
b/scripts/mysql/schema-1.1.0-mysql.sql
index c6bd8a81e3..ca9b351b03 100644
--- a/scripts/mysql/schema-1.1.0-mysql.sql
+++ b/scripts/mysql/schema-1.1.0-mysql.sql
@@ -1,22 +1,3 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
 --
 -- Licensed to the Apache Software Foundation (ASF) under one
 -- or more contributor license agreements.  See the NOTICE file--
diff --git a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql 
b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
index 5560993eb6..6663150f15 100644
--- a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
+++ b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
@@ -1,21 +1,21 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file--
+--  distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
 
 CREATE TABLE IF NOT EXISTS `table_version_info` (
     `table_id`        BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
diff --git a/scripts/postgresql/schema-1.1.0-postgresql.sql 
b/scripts/postgresql/schema-1.1.0-postgresql.sql
index bc69e7839b..c5bc6b3205 100644
--- a/scripts/postgresql/schema-1.1.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.1.0-postgresql.sql
@@ -1,22 +1,3 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
 --
 -- Licensed to the Apache Software Foundation (ASF) under one
 -- or more contributor license agreements.  See the NOTICE file--
diff --git a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql 
b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
index 882c9a6cc2..42d06e30a8 100644
--- a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
@@ -1,21 +1,21 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file--
+--  distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
 
 
 CREATE TABLE IF NOT EXISTS table_version_info (


Reply via email to