This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 70051f81f4 [#8921] improvement(catalogs): Add ITs for lance table 
operations (#9190)
70051f81f4 is described below

commit 70051f81f469583224762e36bb4e9e5ce6271e0c
Author: Junda Yang <[email protected]>
AuthorDate: Thu Nov 20 00:16:42 2025 -0800

    [#8921] improvement(catalogs): Add ITs for lance table operations (#9190)
    
    ### What changes were proposed in this pull request?
    This PR adds comprehensive integration tests for Lance table operations
    and fixes several related bugs:
    
    1. Integration Tests: Added CatalogGenericLakehouseLanceIT with tests
    for:
    - Lance table creation with catalog/schema-level location resolution
    - Lance table loading and listing
    - Lance data format validation using Apache Arrow (reading/writing data
    with Arrow IPC)
    
    2. Bug Fixes:
    - Fixed dropTable() in GenericLakehouseCatalogOperations to properly
    handle non-existent tables (return false instead of throwing exception)
    - Fixed SQL insertion bug in TableVersionBaseSQLProvider - removed
    invalid last_version column from INSERT statement
    - Added PostgreSQL-specific ON CONFLICT handling in
    TableVersionPostgreSQLProvider for table version upserts
    - Fixed TableMetaService.alterTable() to properly insert/update table
    version metadata
    - Enhanced POConverters.updateTablePO() to serialize GenericTableEntity
    fields (format, comment, properties, indexes) during table updates
    
    3. Dependency Updates: Added hadoop3.client.runtime to support Lance
    table operations
    
    4. Unit Tests: Added testLanceTableCreateAndUpdate in TestEntityStorage
    to test Lance table CRUD operations in the entity store
    
    
    ### Why are the changes needed?
    
    Lance table operations lacked integration test coverage, making it
    difficult to verify end-to-end functionality. Several bugs were
    discovered during test development:
    - Dropping non-existent tables threw exceptions instead of returning
    false
    - Table version metadata wasn't being properly persisted during table
    alterations
    - PostgreSQL INSERT operations failed on duplicate keys without proper
    conflict handling
    - Generic table properties weren't being serialized during updates
    
    These issues would have prevented Lance tables from working correctly in
    production.
    
    Fix: #8921
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The changes are internal improvements (tests and bug fixes) that
    don't alter user-facing APIs or behavior.
    
    ### How was this patch tested?
    
    Unit tests and IT tests
    
    Co-authored-by: Mini Yu <[email protected]>
---
 .../catalog-generic-lakehouse/build.gradle.kts     |   1 +
 .../GenericLakehouseCatalogOperations.java         |  39 +-
 .../test/CatalogGenericLakehouseLanceIT.java       | 419 +++++++++++++++++++++
 .../provider/base/TableVersionBaseSQLProvider.java |   5 +-
 .../postgresql/TableVersionPostgreSQLProvider.java |  37 +-
 .../relational/service/TableMetaService.java       |   8 +
 .../storage/relational/utils/POConverters.java     |  40 +-
 .../gravitino/storage/TestEntityStorage.java       | 135 +++++++
 8 files changed, 653 insertions(+), 31 deletions(-)

diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts 
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index 704dbda7e3..df401dcde4 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,6 +43,7 @@ dependencies {
   implementation(libs.commons.lang3)
   implementation(libs.guava)
   implementation(libs.hadoop3.client.api)
+  implementation(libs.hadoop3.client.runtime)
   implementation(libs.lance)
 
   annotationProcessor(libs.lombok)
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 32a341bc2b..459a15afd1 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -62,6 +62,8 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Operations for interacting with a generic lakehouse catalog in Apache 
Gravitino.
@@ -71,6 +73,8 @@ import org.apache.hadoop.fs.Path;
  */
 public class GenericLakehouseCatalogOperations
     implements CatalogOperations, SupportsSchemas, TableCatalog {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GenericLakehouseCatalogOperations.class);
 
   private static final String SLASH = "/";
 
@@ -299,24 +303,27 @@ public class GenericLakehouseCatalogOperations
   @Override
   public boolean dropTable(NameIdentifier ident) {
     EntityStore store = GravitinoEnv.getInstance().entityStore();
-    Namespace namespace = ident.namespace();
+    GenericTableEntity tableEntity;
     try {
-      GenericTableEntity tableEntity =
-          store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
-      Map<String, String> tableProperties = tableEntity.getProperties();
-      String format = tableProperties.getOrDefault("format", "lance");
-      LakehouseCatalogOperations lakehouseCatalogOperations =
-          SUPPORTED_FORMATS.compute(
-              format,
-              (k, v) ->
-                  v == null
-                      ? createLakehouseCatalogOperations(
-                          format, tableProperties, catalogInfo, 
propertiesMetadata)
-                      : v);
-      return lakehouseCatalogOperations.dropTable(ident);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
+      tableEntity = store.get(ident, Entity.EntityType.TABLE, 
GenericTableEntity.class);
+    } catch (NoSuchEntityException e) {
+      LOG.warn("Table {} does not exist, skip dropping.", ident);
+      return false;
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to get table " + ident);
     }
+
+    Map<String, String> tableProperties = tableEntity.getProperties();
+    String format = tableProperties.getOrDefault("format", "lance");
+    LakehouseCatalogOperations lakehouseCatalogOperations =
+        SUPPORTED_FORMATS.compute(
+            format,
+            (k, v) ->
+                v == null
+                    ? createLakehouseCatalogOperations(
+                        format, tableProperties, catalogInfo, 
propertiesMetadata)
+                    : v);
+    return lakehouseCatalogOperations.dropTable(ident);
   }
 
   private String ensureTrailingSlash(String path) {
diff --git 
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
new file mode 100644
index 0000000000..c4790c4d85
--- /dev/null
+++ 
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.integration.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.ipc.LanceScanner;
+import com.lancedb.lance.ipc.ScanOptions;
+import com.lancedb.lance.operation.Append;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogGenericLakehouseLanceIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogGenericLakehouseLanceIT.class);
+  public static final String metalakeName =
+      GravitinoITUtils.genRandomName("CatalogGenericLakeLanceIT_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("CatalogGenericLakeLanceI_catalog");
+  public String SCHEMA_PREFIX = "CatalogGenericLakeLance_schema";
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  public String TABLE_PREFIX = "CatalogGenericLakeLance_table";
+  public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+  public static final String TABLE_COMMENT = "table_comment";
+  public static final String LANCE_COL_NAME1 = "lance_col_name1";
+  public static final String LANCE_COL_NAME2 = "lance_col_name2";
+  public static final String LANCE_COL_NAME3 = "lance_col_name3";
+  protected final String provider = "generic-lakehouse";
+  protected final ContainerSuite containerSuite = ContainerSuite.getInstance();
+  protected GravitinoMetalake metalake;
+  protected Catalog catalog;
+  protected String tempDirectory;
+
+  @BeforeAll
+  public void startup() throws Exception {
+    createMetalake();
+    createCatalog();
+    createSchema();
+
+    // Create a temp directory for test use
+    Path tempDir = Files.createTempDirectory("myTempDir");
+    tempDirectory = tempDir.toString();
+    File file = new File(tempDirectory);
+    file.deleteOnExit();
+  }
+
+  @AfterAll
+  public void stop() throws IOException {
+    if (client != null) {
+      Arrays.stream(catalog.asSchemas().listSchemas())
+          .filter(schema -> !schema.equals("default"))
+          .forEach(
+              (schema -> {
+                catalog.asSchemas().dropSchema(schema, true);
+              }));
+      Arrays.stream(metalake.listCatalogs())
+          .forEach(
+              catalogName -> {
+                metalake.dropCatalog(catalogName, true);
+              });
+      client.dropMetalake(metalakeName, true);
+    }
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close CloseableGroup", e);
+    }
+
+    client = null;
+  }
+
+  @AfterEach
+  public void resetSchema() throws InterruptedException {
+    catalog.asSchemas().dropSchema(schemaName, true);
+    createSchema();
+  }
+
+  @Test
+  public void testCreateLanceTable() throws InterruptedException {
+    // Create a table from Gravitino API
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put("format", "lance");
+    properties.put("location", tableLocation);
+
+    Table createdTable =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier,
+                columns,
+                TABLE_COMMENT,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+
+    Assertions.assertEquals(createdTable.name(), tableName);
+    Map<String, String> createdTableProperties = createdTable.properties();
+    Assertions.assertEquals("lance", createdTableProperties.get("format"));
+
+    Assertions.assertEquals(TABLE_COMMENT, createdTable.comment());
+    Assertions.assertEquals(3, createdTable.columns().length);
+    columnEquals(columns, createdTable.columns());
+    String expectedTableLocation = tempDirectory + "/" + tableName + "/";
+    Assertions.assertEquals(expectedTableLocation, 
createdTableProperties.get("location"));
+    Assertions.assertTrue(new File(expectedTableLocation).exists());
+
+    // Drop table
+    catalog.asTableCatalog().dropTable(nameIdentifier);
+    catalog.asSchemas().dropSchema(schemaName, true);
+
+    Map<String, String> schemaProperties = createSchemaProperties();
+    String schemaLocation = tempDirectory + "/schema_location";
+    schemaProperties.put("location", schemaLocation);
+    catalog.asSchemas().createSchema(schemaName, "comment", schemaProperties);
+    properties = createProperties();
+    properties.put("format", "lance");
+
+    createdTable =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier,
+                columns,
+                TABLE_COMMENT,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+    Assertions.assertEquals(createdTable.name(), tableName);
+    createdTableProperties = createdTable.properties();
+    Assertions.assertEquals("lance", createdTableProperties.get("format"));
+
+    Assertions.assertEquals(TABLE_COMMENT, createdTable.comment());
+    Assertions.assertEquals(3, createdTable.columns().length);
+    columnEquals(columns, createdTable.columns());
+    expectedTableLocation = schemaLocation + "/" + tableName + "/";
+    Assertions.assertEquals(expectedTableLocation, 
createdTableProperties.get("location"));
+    Assertions.assertTrue(new File(expectedTableLocation).exists());
+
+    // Now try to load table
+    Table loadedTable = catalog.asTableCatalog().loadTable(nameIdentifier);
+    Assertions.assertEquals(createdTable.name(), loadedTable.name());
+    Map<String, String> loadedTableProperties = loadedTable.properties();
+    Assertions.assertEquals("lance", loadedTableProperties.get("format"));
+    Assertions.assertEquals(expectedTableLocation, 
loadedTableProperties.get("location"));
+    Assertions.assertEquals(TABLE_COMMENT, loadedTable.comment());
+
+    // Now test list tables
+    List<NameIdentifier> tableIdentifiers =
+        
Arrays.asList(catalog.asTableCatalog().listTables(nameIdentifier.namespace()));
+    Assertions.assertEquals(1, tableIdentifiers.size());
+    Assertions.assertEquals(nameIdentifier, tableIdentifiers.get(0));
+  }
+
+  @Test
+  void testLanceTableFormat() {
+    String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+    Column[] columns = createColumns();
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = createProperties();
+    String tableLocation = tempDirectory + "/" + tableName;
+    properties.put("format", "lance");
+    properties.put("location", tableLocation);
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            nameIdentifier,
+            columns,
+            TABLE_COMMENT,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            null,
+            null);
+
+    // Now try to read the lance directory and check it.
+    try (Dataset dataset = Dataset.open(tableLocation)) {
+      org.apache.arrow.vector.types.pojo.Schema lanceSchema = 
dataset.getSchema();
+      List<Field> fields = lanceSchema.getFields();
+      for (Field field : fields) {
+        if (field.getName().equals(LANCE_COL_NAME1)) {
+          Assertions.assertEquals(new ArrowType.Int(32, true), 
field.getType());
+        } else if (field.getName().equals(LANCE_COL_NAME2)) {
+          Assertions.assertEquals(new ArrowType.Int(64, true), 
field.getType());
+        } else if (field.getName().equals(LANCE_COL_NAME3)) {
+          Assertions.assertEquals(new ArrowType.Utf8(), field.getType());
+        } else {
+          Assertions.fail("Unexpected column name in lance table: " + 
field.getName());
+        }
+      }
+
+      // Now try to write some data to the dataset
+      Transaction trans =
+          dataset
+              .newTransactionBuilder()
+              .operation(
+                  Append.builder()
+                      .fragments(
+                          createFragmentMetadata(
+                              tableLocation,
+                              Arrays.asList(
+                                  new LanceDataValue(1, 100L, "first"),
+                                  new LanceDataValue(2, 200L, "second"),
+                                  new LanceDataValue(3, 300L, "third")),
+                              lanceSchema))
+                      .build())
+              .writeParams(ImmutableMap.of())
+              .build();
+
+      Dataset newDataset = dataset.commitTransaction(trans);
+      try (LanceScanner scanner =
+          newDataset.newScan(
+              new ScanOptions.Builder()
+                  .columns(Arrays.asList(LANCE_COL_NAME1, LANCE_COL_NAME2, 
LANCE_COL_NAME3))
+                  .batchSize(1000)
+                  .build())) {
+
+        List<LanceDataValue> dataValues = Lists.newArrayList();
+        try (ArrowReader reader = scanner.scanBatches()) {
+          while (reader.loadNextBatch()) {
+            VectorSchemaRoot root = reader.getVectorSchemaRoot();
+            List<FieldVector> fieldVectors = root.getFieldVectors();
+
+            IntVector col1Vector = (IntVector) fieldVectors.get(0);
+            BigIntVector col2Vector = (BigIntVector) fieldVectors.get(1);
+            VarCharVector col3Vector = (VarCharVector) fieldVectors.get(2);
+
+            for (int i = 0; i < root.getRowCount(); i++) {
+              int col1 = col1Vector.get(i);
+              long col2 = col2Vector.get(i);
+              String col3 = new String(col3Vector.get(i), 
StandardCharsets.UTF_8);
+              dataValues.add(new LanceDataValue(col1, col2, col3));
+            }
+          }
+        }
+
+        Assertions.assertEquals(3, dataValues.size());
+        Assertions.assertEquals(1, dataValues.get(0).col1);
+        Assertions.assertEquals(100L, dataValues.get(0).col2);
+        Assertions.assertEquals("first", dataValues.get(0).col3);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static class LanceDataValue {
+    public Integer col1;
+    public Long col2;
+    public String col3;
+
+    public LanceDataValue(Integer col1, Long col2, String col3) {
+      this.col1 = col1;
+      this.col2 = col2;
+      this.col3 = col3;
+    }
+  }
+
+  private List<FragmentMetadata> createFragmentMetadata(
+      String tableLocation,
+      List<LanceDataValue> updates,
+      org.apache.arrow.vector.types.pojo.Schema schema)
+      throws JsonProcessingException {
+    List<FragmentMetadata> fragmentMetas;
+    int count = 0;
+    RootAllocator rootAllocator = new RootAllocator();
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
rootAllocator)) {
+      for (FieldVector vector : root.getFieldVectors()) {
+        vector.setInitialCapacity(count);
+      }
+      root.allocateNew();
+
+      IntVector col1Vector = (IntVector) root.getVector(LANCE_COL_NAME1);
+      BigIntVector col2Vector = (BigIntVector) root.getVector(LANCE_COL_NAME2);
+      VarCharVector col3Vector = (VarCharVector) 
root.getVector(LANCE_COL_NAME3);
+
+      int index = 0;
+      for (LanceDataValue data : updates) {
+        col1Vector.setSafe(index, data.col1);
+        col2Vector.setSafe(index, data.col2);
+        col3Vector.setSafe(index, data.col3.getBytes(StandardCharsets.UTF_8));
+        index++;
+      }
+      root.setRowCount(index);
+
+      fragmentMetas =
+          Fragment.create(tableLocation, rootAllocator, root, new 
WriteParams.Builder().build());
+      return fragmentMetas;
+    }
+  }
+
+  protected Map<String, String> createSchemaProperties() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    return properties;
+  }
+
+  private void columnEquals(Column[] expect, Column[] actual) {
+    Assertions.assertEquals(expect.length, actual.length);
+
+    for (int i = 0; i < expect.length; i++) {
+      Column expectCol = expect[i];
+      Column actualCol = actual[i];
+
+      Assertions.assertEquals(expectCol.name(), actualCol.name());
+      Assertions.assertEquals(expectCol.dataType(), actualCol.dataType());
+      Assertions.assertEquals(expectCol.comment(), actualCol.comment());
+    }
+  }
+
+  private void createMetalake() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  protected void createCatalog() {
+    Map<String, String> properties = Maps.newHashMap();
+    metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, provider, 
"comment", properties);
+
+    catalog = metalake.loadCatalog(catalogName);
+  }
+
+  private void createSchema() throws InterruptedException {
+    Map<String, String> schemaProperties = createSchemaProperties();
+    String comment = "comment";
+    catalog.asSchemas().createSchema(schemaName, comment, schemaProperties);
+    Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+    Assertions.assertEquals(schemaName, loadSchema.name());
+    Assertions.assertEquals(comment, loadSchema.comment());
+    Assertions.assertEquals("val1", loadSchema.properties().get("key1"));
+    Assertions.assertEquals("val2", loadSchema.properties().get("key2"));
+  }
+
+  private Column[] createColumns() {
+    Column col1 = Column.of(LANCE_COL_NAME1, Types.IntegerType.get(), 
"col_1_comment");
+    Column col2 = Column.of(LANCE_COL_NAME2, Types.LongType.get(), 
"col_2_comment");
+    Column col3 = Column.of(LANCE_COL_NAME3, Types.StringType.get(), 
"col_3_comment");
+    return new Column[] {col1, col2, col3};
+  }
+
+  protected Map<String, String> createProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+
+    return properties;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
index 3501abe10c..c39b8cbabb 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -29,9 +29,9 @@ public class TableVersionBaseSQLProvider {
   public String insertTableVersion(@Param("tablePO") TablePO tablePO) {
     return "INSERT INTO "
         + TABLE_NAME
-        + " (table_id, format, properties, partitioning"
+        + " (table_id, format, properties, partitioning,"
         + " distribution, sort_orders, indexes, comment,"
-        + " version, last_version, deleted_at)"
+        + " version, deleted_at)"
         + " VALUES ("
         + " #{tablePO.tableId},"
         + " #{tablePO.format},"
@@ -42,7 +42,6 @@ public class TableVersionBaseSQLProvider {
         + " #{tablePO.indexes},"
         + " #{tablePO.comment},"
         + " #{tablePO.currentVersion},"
-        + " #{tablePO.lastVersion},"
         + " #{tablePO.deletedAt}"
         + " )";
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
index e0a7413b1c..13eebeaa2c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -19,6 +19,41 @@
 
 package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
 
+import static 
org.apache.gravitino.storage.relational.mapper.TableVersionMapper.TABLE_NAME;
+
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionPostgreSQLProvider extends 
TableVersionBaseSQLProvider {
 
-public class TableVersionPostgreSQLProvider extends 
TableVersionBaseSQLProvider {}
+  public String insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") 
TablePO tablePO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + " (table_id, format, properties, partitioning,"
+        + " distribution, sort_orders, indexes, comment,"
+        + " version, deleted_at)"
+        + " VALUES ("
+        + " #{tablePO.tableId},"
+        + " #{tablePO.format},"
+        + " #{tablePO.properties},"
+        + " #{tablePO.partitions},"
+        + " #{tablePO.distribution},"
+        + " #{tablePO.sortOrders},"
+        + " #{tablePO.indexes},"
+        + " #{tablePO.comment},"
+        + " #{tablePO.currentVersion},"
+        + " #{tablePO.deletedAt}"
+        + " )"
+        + " ON CONFLICT (table_id, deleted_at) DO UPDATE SET"
+        + " format = #{tablePO.format},"
+        + " properties = #{tablePO.properties},"
+        + " partitioning = #{tablePO.partitions},"
+        + " distribution = #{tablePO.distribution},"
+        + " sort_orders = #{tablePO.sortOrders},"
+        + " indexes = #{tablePO.indexes},"
+        + " comment = #{tablePO.comment},"
+        + " version = #{tablePO.currentVersion},"
+        + " deleted_at = #{tablePO.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index f4bbf7a6f6..7a42d95db6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -210,6 +210,14 @@ public class TableMetaService {
                   SessionUtils.getWithoutCommit(
                       TableMetaMapper.class,
                       mapper -> mapper.updateTableMeta(newTablePO, oldTablePO, 
newSchemaId))),
+          () ->
+              SessionUtils.doWithCommit(
+                  TableVersionMapper.class,
+                  mapper -> {
+                    if (newTablePO.getFormat() != null) {
+                      
mapper.insertTableVersionOnDuplicateKeyUpdate(newTablePO);
+                    }
+                  }),
           () -> {
             if (updateResult.get() > 0 && (isColumnChanged || 
isSchemaChanged)) {
               TableColumnMetaService.getInstance()
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 62bc11f891..fa8f06a9f8 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -457,17 +457,35 @@ public class POConverters {
     }
 
     try {
-      return TablePO.builder()
-          .withTableId(oldTablePO.getTableId())
-          .withTableName(newTable.name())
-          .withMetalakeId(oldTablePO.getMetalakeId())
-          .withCatalogId(oldTablePO.getCatalogId())
-          .withSchemaId(newSchemaId)
-          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo()))
-          .withCurrentVersion(currentVersion)
-          .withLastVersion(lastVersion)
-          .withDeletedAt(DEFAULT_DELETED_AT)
-          .build();
+      TablePO.Builder builder =
+          TablePO.builder()
+              .withTableId(oldTablePO.getTableId())
+              .withTableName(newTable.name())
+              .withMetalakeId(oldTablePO.getMetalakeId())
+              .withCatalogId(oldTablePO.getCatalogId())
+              .withSchemaId(newSchemaId)
+              
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo()))
+              .withCurrentVersion(currentVersion)
+              .withLastVersion(lastVersion)
+              .withDeletedAt(DEFAULT_DELETED_AT);
+
+      // Note: GenericTableEntity will be removed in the refactor PR, so here 
just keep the old
+      // logic to make the UT pass.
+      if (newTable instanceof GenericTableEntity genericTable) {
+        builder.withFormat(genericTable.getFormat());
+        builder.withComment(genericTable.getComment());
+        builder.withProperties(
+            genericTable.getProperties() == null
+                ? null
+                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+        builder.withIndexes(
+            genericTable.getIndexes() == null
+                ? null
+                : 
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
+        // TODO other fields in the refactor PRs.
+      }
+
+      return builder.build();
     } catch (JsonProcessingException e) {
       throw new RuntimeException("Failed to serialize json object:", e);
     }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 143e979f69..5ff904eb31 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -81,6 +81,7 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
@@ -2792,4 +2793,138 @@ public class TestEntityStorage {
       destroy(type);
     }
   }
+
+  @ParameterizedTest
+  @MethodSource("storageProvider")
+  void testLanceTableCreateAndUpdate(String type) {
+    Config config = Mockito.mock(Config.class);
+    init(type, config);
+
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+    try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
+      store.initialize(config);
+
+      BaseMetalake metalake =
+          createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", 
auditInfo);
+      store.put(metalake, false);
+
+      CatalogEntity catalogEntity =
+          CatalogEntity.builder()
+              .withId(RandomIdGenerator.INSTANCE.nextId())
+              .withName("catalog")
+              .withNamespace(NamespaceUtil.ofCatalog("metalake"))
+              .withType(Catalog.Type.RELATIONAL)
+              .withProvider("generic-lakehouse")
+              .withComment("This is a generic-lakehouse")
+              .withProperties(ImmutableMap.of())
+              .withAuditInfo(auditInfo)
+              .build();
+
+      store.put(catalogEntity, false);
+
+      SchemaEntity schemaEntity =
+          SchemaEntity.builder()
+              .withId(RandomIdGenerator.INSTANCE.nextId())
+              .withName("schema")
+              .withNamespace(NamespaceUtil.ofSchema("metalake", "catalog"))
+              .withComment("This is a schema for generic-lakehouse")
+              .withProperties(ImmutableMap.of())
+              .withAuditInfo(auditInfo)
+              .build();
+      store.put(schemaEntity, false);
+
+      long column1Id = RandomIdGenerator.INSTANCE.nextId();
+      GenericTableEntity table =
+          GenericTableEntity.getBuilder()
+              .withId(RandomIdGenerator.INSTANCE.nextId())
+              .withNamespace(NamespaceUtil.ofTable("metalake", "catalog", 
"schema"))
+              .withName("table")
+              .withAuditInfo(auditInfo)
+              .withColumns(
+                  Lists.newArrayList(
+                      ColumnEntity.builder()
+                          .withId(column1Id)
+                          .withName("column1")
+                          .withDataType(Types.StringType.get())
+                          .withComment("test column")
+                          .withPosition(1)
+                          .withAuditInfo(auditInfo)
+                          .build()))
+              .withComment("This is a lance table")
+              .withFormat("lance")
+              .withProperties(ImmutableMap.of("location", "/tmp/test", 
"format", "lance"))
+              .build();
+      store.put(table, false);
+      GenericTableEntity fetchedTable =
+          store.get(table.nameIdentifier(), Entity.EntityType.TABLE, 
GenericTableEntity.class);
+
+      // check table properties
+      Assertions.assertEquals("/tmp/test", 
fetchedTable.getProperties().get("location"));
+      Assertions.assertEquals("lance", 
fetchedTable.getProperties().get("format"));
+      Assertions.assertEquals("This is a lance table", 
fetchedTable.getComment());
+      Assertions.assertEquals(1, fetchedTable.columns().size());
+      Assertions.assertEquals("column1", fetchedTable.columns().get(0).name());
+
+      // Now try to update the table
+      GenericTableEntity updatedTable =
+          GenericTableEntity.getBuilder()
+              .withId(table.id())
+              .withNamespace(table.namespace())
+              .withName(table.name())
+              .withAuditInfo(auditInfo)
+              .withFormat("lance")
+              .withColumns(
+                  Lists.newArrayList(
+                      ColumnEntity.builder()
+                          .withId(column1Id)
+                          .withName("column1")
+                          .withDataType(Types.StringType.get())
+                          .withComment("updated test column")
+                          .withPosition(1)
+                          .withAuditInfo(auditInfo)
+                          .build(),
+                      ColumnEntity.builder()
+                          .withId(RandomIdGenerator.INSTANCE.nextId())
+                          .withName("column2")
+                          .withDataType(Types.IntegerType.get())
+                          .withComment("new column")
+                          .withPosition(2)
+                          .withAuditInfo(auditInfo)
+                          .build()))
+              .withComment("This is an updated lance table")
+              .withProperties(ImmutableMap.of("location", "/tmp/updated_test", 
"format", "lance"))
+              .build();
+
+      store.update(
+          table.nameIdentifier(),
+          GenericTableEntity.class,
+          Entity.EntityType.TABLE,
+          e -> updatedTable);
+      GenericTableEntity fetchedUpdatedTable =
+          store.get(table.nameIdentifier(), Entity.EntityType.TABLE, 
GenericTableEntity.class);
+
+      // check updated table properties
+      Assertions.assertEquals(
+          "/tmp/updated_test", 
fetchedUpdatedTable.getProperties().get("location"));
+      Assertions.assertEquals("lance", 
fetchedUpdatedTable.getProperties().get("format"));
+      Assertions.assertEquals("This is an updated lance table", 
fetchedUpdatedTable.getComment());
+      Assertions.assertEquals(2, fetchedUpdatedTable.columns().size());
+      for (ColumnEntity column : fetchedUpdatedTable.columns()) {
+        if (column.name().equals("column1")) {
+          Assertions.assertEquals("updated test column", column.comment());
+        }
+      }
+
+      Assertions.assertTrue(
+          fetchedUpdatedTable.columns().stream()
+              .filter(c -> c.name().equals("column2"))
+              .findFirst()
+              .isPresent());
+      destroy(type);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }


Reply via email to