This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 70051f81f4 [#8921] improvement(catalogs): Add ITs for lance table
operations (#9190)
70051f81f4 is described below
commit 70051f81f469583224762e36bb4e9e5ce6271e0c
Author: Junda Yang <[email protected]>
AuthorDate: Thu Nov 20 00:16:42 2025 -0800
[#8921] improvement(catalogs): Add ITs for lance table operations (#9190)
### What changes were proposed in this pull request?
This PR adds comprehensive integration tests for Lance table operations
and fixes several related bugs:
1. Integration Tests: Added CatalogGenericLakehouseLanceIT with tests
for:
- Lance table creation with catalog/schema-level location resolution
- Lance table loading and listing
- Lance data format validation using Apache Arrow (reading/writing data
with Arrow IPC)
2. Bug Fixes:
- Fixed dropTable() in GenericLakehouseCatalogOperations to properly
handle non-existent tables (return false instead of throwing exception)
- Fixed SQL insertion bug in TableVersionBaseSQLProvider - removed
invalid last_version column from INSERT statement
- Added PostgreSQL-specific ON CONFLICT handling in
TableVersionPostgreSQLProvider for table version upserts
- Fixed TableMetaService.alterTable() to properly insert/update table
version metadata
- Enhanced POConverters.updateTablePO() to serialize GenericTableEntity
fields (format, comment, properties, indexes) during table updates
3. Dependency Updates: Added hadoop3.client.runtime to support Lance
table operations
4. Unit Tests: Added testLanceTableCreateAndUpdate in TestEntityStorage
to test Lance table CRUD operations in the entity store
### Why are the changes needed?
Lance table operations lacked integration test coverage, making it
difficult to verify end-to-end functionality. Several bugs were
discovered during test development:
- Dropping non-existent tables threw exceptions instead of returning
false
- Table version metadata wasn't being properly persisted during table
alterations
- PostgreSQL INSERT operations failed on duplicate keys without proper
conflict handling
- Generic table properties weren't being serialized during updates
These issues would have prevented Lance tables from working correctly in
production.
Fix: #8921
### Does this PR introduce _any_ user-facing change?
No. The changes are internal improvements (tests and bug fixes) that
don't alter user-facing APIs or behavior.
### How was this patch tested?
Unit tests and IT tests
Co-authored-by: Mini Yu <[email protected]>
---
.../catalog-generic-lakehouse/build.gradle.kts | 1 +
.../GenericLakehouseCatalogOperations.java | 39 +-
.../test/CatalogGenericLakehouseLanceIT.java | 419 +++++++++++++++++++++
.../provider/base/TableVersionBaseSQLProvider.java | 5 +-
.../postgresql/TableVersionPostgreSQLProvider.java | 37 +-
.../relational/service/TableMetaService.java | 8 +
.../storage/relational/utils/POConverters.java | 40 +-
.../gravitino/storage/TestEntityStorage.java | 135 +++++++
8 files changed, 653 insertions(+), 31 deletions(-)
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index 704dbda7e3..df401dcde4 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,6 +43,7 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.hadoop3.client.api)
+ implementation(libs.hadoop3.client.runtime)
implementation(libs.lance)
annotationProcessor(libs.lombok)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 32a341bc2b..459a15afd1 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -62,6 +62,8 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Operations for interacting with a generic lakehouse catalog in Apache
Gravitino.
@@ -71,6 +73,8 @@ import org.apache.hadoop.fs.Path;
*/
public class GenericLakehouseCatalogOperations
implements CatalogOperations, SupportsSchemas, TableCatalog {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GenericLakehouseCatalogOperations.class);
private static final String SLASH = "/";
@@ -299,24 +303,27 @@ public class GenericLakehouseCatalogOperations
@Override
public boolean dropTable(NameIdentifier ident) {
EntityStore store = GravitinoEnv.getInstance().entityStore();
- Namespace namespace = ident.namespace();
+ GenericTableEntity tableEntity;
try {
- GenericTableEntity tableEntity =
- store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
- Map<String, String> tableProperties = tableEntity.getProperties();
- String format = tableProperties.getOrDefault("format", "lance");
- LakehouseCatalogOperations lakehouseCatalogOperations =
- SUPPORTED_FORMATS.compute(
- format,
- (k, v) ->
- v == null
- ? createLakehouseCatalogOperations(
- format, tableProperties, catalogInfo,
propertiesMetadata)
- : v);
- return lakehouseCatalogOperations.dropTable(ident);
- } catch (IOException e) {
- throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
+ tableEntity = store.get(ident, Entity.EntityType.TABLE,
GenericTableEntity.class);
+ } catch (NoSuchEntityException e) {
+ LOG.warn("Table {} does not exist, skip dropping.", ident);
+ return false;
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to get table " + ident);
}
+
+ Map<String, String> tableProperties = tableEntity.getProperties();
+ String format = tableProperties.getOrDefault("format", "lance");
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, tableProperties, catalogInfo,
propertiesMetadata)
+ : v);
+ return lakehouseCatalogOperations.dropTable(ident);
}
private String ensureTrailingSlash(String path) {
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
new file mode 100644
index 0000000000..c4790c4d85
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.integration.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.ipc.LanceScanner;
+import com.lancedb.lance.ipc.ScanOptions;
+import com.lancedb.lance.operation.Append;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogGenericLakehouseLanceIT extends BaseIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(CatalogGenericLakehouseLanceIT.class);
+ public static final String metalakeName =
+ GravitinoITUtils.genRandomName("CatalogGenericLakeLanceIT_metalake");
+ public String catalogName =
GravitinoITUtils.genRandomName("CatalogGenericLakeLanceI_catalog");
+ public String SCHEMA_PREFIX = "CatalogGenericLakeLance_schema";
+ public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ public String TABLE_PREFIX = "CatalogGenericLakeLance_table";
+ public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+ public static final String TABLE_COMMENT = "table_comment";
+ public static final String LANCE_COL_NAME1 = "lance_col_name1";
+ public static final String LANCE_COL_NAME2 = "lance_col_name2";
+ public static final String LANCE_COL_NAME3 = "lance_col_name3";
+ protected final String provider = "generic-lakehouse";
+ protected final ContainerSuite containerSuite = ContainerSuite.getInstance();
+ protected GravitinoMetalake metalake;
+ protected Catalog catalog;
+ protected String tempDirectory;
+
+ @BeforeAll
+ public void startup() throws Exception {
+ createMetalake();
+ createCatalog();
+ createSchema();
+
+ // Create a temp directory for test use
+ Path tempDir = Files.createTempDirectory("myTempDir");
+ tempDirectory = tempDir.toString();
+ File file = new File(tempDirectory);
+ file.deleteOnExit();
+ }
+
+ @AfterAll
+ public void stop() throws IOException {
+ if (client != null) {
+ Arrays.stream(catalog.asSchemas().listSchemas())
+ .filter(schema -> !schema.equals("default"))
+ .forEach(
+ (schema -> {
+ catalog.asSchemas().dropSchema(schema, true);
+ }));
+ Arrays.stream(metalake.listCatalogs())
+ .forEach(
+ catalogName -> {
+ metalake.dropCatalog(catalogName, true);
+ });
+ client.dropMetalake(metalakeName, true);
+ }
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close CloseableGroup", e);
+ }
+
+ client = null;
+ }
+
+ @AfterEach
+ public void resetSchema() throws InterruptedException {
+ catalog.asSchemas().dropSchema(schemaName, true);
+ createSchema();
+ }
+
+ @Test
+ public void testCreateLanceTable() throws InterruptedException {
+ // Create a table from Gravitino API
+ Column[] columns = createColumns();
+ NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+ Map<String, String> properties = createProperties();
+ String tableLocation = tempDirectory + "/" + tableName;
+ properties.put("format", "lance");
+ properties.put("location", tableLocation);
+
+ Table createdTable =
+ catalog
+ .asTableCatalog()
+ .createTable(
+ nameIdentifier,
+ columns,
+ TABLE_COMMENT,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ null,
+ null);
+
+ Assertions.assertEquals(createdTable.name(), tableName);
+ Map<String, String> createdTableProperties = createdTable.properties();
+ Assertions.assertEquals("lance", createdTableProperties.get("format"));
+
+ Assertions.assertEquals(TABLE_COMMENT, createdTable.comment());
+ Assertions.assertEquals(3, createdTable.columns().length);
+ columnEquals(columns, createdTable.columns());
+ String expectedTableLocation = tempDirectory + "/" + tableName + "/";
+ Assertions.assertEquals(expectedTableLocation,
createdTableProperties.get("location"));
+ Assertions.assertTrue(new File(expectedTableLocation).exists());
+
+ // Drop table
+ catalog.asTableCatalog().dropTable(nameIdentifier);
+ catalog.asSchemas().dropSchema(schemaName, true);
+
+ Map<String, String> schemaProperties = createSchemaProperties();
+ String schemaLocation = tempDirectory + "/schema_location";
+ schemaProperties.put("location", schemaLocation);
+ catalog.asSchemas().createSchema(schemaName, "comment", schemaProperties);
+ properties = createProperties();
+ properties.put("format", "lance");
+
+ createdTable =
+ catalog
+ .asTableCatalog()
+ .createTable(
+ nameIdentifier,
+ columns,
+ TABLE_COMMENT,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ null,
+ null);
+ Assertions.assertEquals(createdTable.name(), tableName);
+ createdTableProperties = createdTable.properties();
+ Assertions.assertEquals("lance", createdTableProperties.get("format"));
+
+ Assertions.assertEquals(TABLE_COMMENT, createdTable.comment());
+ Assertions.assertEquals(3, createdTable.columns().length);
+ columnEquals(columns, createdTable.columns());
+ expectedTableLocation = schemaLocation + "/" + tableName + "/";
+ Assertions.assertEquals(expectedTableLocation,
createdTableProperties.get("location"));
+ Assertions.assertTrue(new File(expectedTableLocation).exists());
+
+ // Now try to load table
+ Table loadedTable = catalog.asTableCatalog().loadTable(nameIdentifier);
+ Assertions.assertEquals(createdTable.name(), loadedTable.name());
+ Map<String, String> loadedTableProperties = loadedTable.properties();
+ Assertions.assertEquals("lance", loadedTableProperties.get("format"));
+ Assertions.assertEquals(expectedTableLocation,
loadedTableProperties.get("location"));
+ Assertions.assertEquals(TABLE_COMMENT, loadedTable.comment());
+
+ // Now test list tables
+ List<NameIdentifier> tableIdentifiers =
+
Arrays.asList(catalog.asTableCatalog().listTables(nameIdentifier.namespace()));
+ Assertions.assertEquals(1, tableIdentifiers.size());
+ Assertions.assertEquals(nameIdentifier, tableIdentifiers.get(0));
+ }
+
+ @Test
+ void testLanceTableFormat() {
+ String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+ Column[] columns = createColumns();
+ NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+ Map<String, String> properties = createProperties();
+ String tableLocation = tempDirectory + "/" + tableName;
+ properties.put("format", "lance");
+ properties.put("location", tableLocation);
+
+ catalog
+ .asTableCatalog()
+ .createTable(
+ nameIdentifier,
+ columns,
+ TABLE_COMMENT,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ null,
+ null);
+
+ // Now try to read the lance directory and check it.
+ try (Dataset dataset = Dataset.open(tableLocation)) {
+ org.apache.arrow.vector.types.pojo.Schema lanceSchema =
dataset.getSchema();
+ List<Field> fields = lanceSchema.getFields();
+ for (Field field : fields) {
+ if (field.getName().equals(LANCE_COL_NAME1)) {
+ Assertions.assertEquals(new ArrowType.Int(32, true),
field.getType());
+ } else if (field.getName().equals(LANCE_COL_NAME2)) {
+ Assertions.assertEquals(new ArrowType.Int(64, true),
field.getType());
+ } else if (field.getName().equals(LANCE_COL_NAME3)) {
+ Assertions.assertEquals(new ArrowType.Utf8(), field.getType());
+ } else {
+ Assertions.fail("Unexpected column name in lance table: " +
field.getName());
+ }
+ }
+
+ // Now try to write some data to the dataset
+ Transaction trans =
+ dataset
+ .newTransactionBuilder()
+ .operation(
+ Append.builder()
+ .fragments(
+ createFragmentMetadata(
+ tableLocation,
+ Arrays.asList(
+ new LanceDataValue(1, 100L, "first"),
+ new LanceDataValue(2, 200L, "second"),
+ new LanceDataValue(3, 300L, "third")),
+ lanceSchema))
+ .build())
+ .writeParams(ImmutableMap.of())
+ .build();
+
+ Dataset newDataset = dataset.commitTransaction(trans);
+ try (LanceScanner scanner =
+ newDataset.newScan(
+ new ScanOptions.Builder()
+ .columns(Arrays.asList(LANCE_COL_NAME1, LANCE_COL_NAME2,
LANCE_COL_NAME3))
+ .batchSize(1000)
+ .build())) {
+
+ List<LanceDataValue> dataValues = Lists.newArrayList();
+ try (ArrowReader reader = scanner.scanBatches()) {
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+
+ IntVector col1Vector = (IntVector) fieldVectors.get(0);
+ BigIntVector col2Vector = (BigIntVector) fieldVectors.get(1);
+ VarCharVector col3Vector = (VarCharVector) fieldVectors.get(2);
+
+ for (int i = 0; i < root.getRowCount(); i++) {
+ int col1 = col1Vector.get(i);
+ long col2 = col2Vector.get(i);
+ String col3 = new String(col3Vector.get(i),
StandardCharsets.UTF_8);
+ dataValues.add(new LanceDataValue(col1, col2, col3));
+ }
+ }
+ }
+
+ Assertions.assertEquals(3, dataValues.size());
+ Assertions.assertEquals(1, dataValues.get(0).col1);
+ Assertions.assertEquals(100L, dataValues.get(0).col2);
+ Assertions.assertEquals("first", dataValues.get(0).col3);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static class LanceDataValue {
+ public Integer col1;
+ public Long col2;
+ public String col3;
+
+ public LanceDataValue(Integer col1, Long col2, String col3) {
+ this.col1 = col1;
+ this.col2 = col2;
+ this.col3 = col3;
+ }
+ }
+
+ private List<FragmentMetadata> createFragmentMetadata(
+ String tableLocation,
+ List<LanceDataValue> updates,
+ org.apache.arrow.vector.types.pojo.Schema schema)
+ throws JsonProcessingException {
+ List<FragmentMetadata> fragmentMetas;
+ int count = 0;
+ RootAllocator rootAllocator = new RootAllocator();
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
rootAllocator)) {
+ for (FieldVector vector : root.getFieldVectors()) {
+ vector.setInitialCapacity(count);
+ }
+ root.allocateNew();
+
+ IntVector col1Vector = (IntVector) root.getVector(LANCE_COL_NAME1);
+ BigIntVector col2Vector = (BigIntVector) root.getVector(LANCE_COL_NAME2);
+ VarCharVector col3Vector = (VarCharVector)
root.getVector(LANCE_COL_NAME3);
+
+ int index = 0;
+ for (LanceDataValue data : updates) {
+ col1Vector.setSafe(index, data.col1);
+ col2Vector.setSafe(index, data.col2);
+ col3Vector.setSafe(index, data.col3.getBytes(StandardCharsets.UTF_8));
+ index++;
+ }
+ root.setRowCount(index);
+
+ fragmentMetas =
+ Fragment.create(tableLocation, rootAllocator, root, new
WriteParams.Builder().build());
+ return fragmentMetas;
+ }
+ }
+
+ protected Map<String, String> createSchemaProperties() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "val1");
+ properties.put("key2", "val2");
+ return properties;
+ }
+
+ private void columnEquals(Column[] expect, Column[] actual) {
+ Assertions.assertEquals(expect.length, actual.length);
+
+ for (int i = 0; i < expect.length; i++) {
+ Column expectCol = expect[i];
+ Column actualCol = actual[i];
+
+ Assertions.assertEquals(expectCol.name(), actualCol.name());
+ Assertions.assertEquals(expectCol.dataType(), actualCol.dataType());
+ Assertions.assertEquals(expectCol.comment(), actualCol.comment());
+ }
+ }
+
+ private void createMetalake() {
+ GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+ Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+ client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+ GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+ Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+ metalake = loadMetalake;
+ }
+
+ protected void createCatalog() {
+ Map<String, String> properties = Maps.newHashMap();
+ metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, provider,
"comment", properties);
+
+ catalog = metalake.loadCatalog(catalogName);
+ }
+
+ private void createSchema() throws InterruptedException {
+ Map<String, String> schemaProperties = createSchemaProperties();
+ String comment = "comment";
+ catalog.asSchemas().createSchema(schemaName, comment, schemaProperties);
+ Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+ Assertions.assertEquals(schemaName, loadSchema.name());
+ Assertions.assertEquals(comment, loadSchema.comment());
+ Assertions.assertEquals("val1", loadSchema.properties().get("key1"));
+ Assertions.assertEquals("val2", loadSchema.properties().get("key2"));
+ }
+
+ private Column[] createColumns() {
+ Column col1 = Column.of(LANCE_COL_NAME1, Types.IntegerType.get(),
"col_1_comment");
+ Column col2 = Column.of(LANCE_COL_NAME2, Types.LongType.get(),
"col_2_comment");
+ Column col3 = Column.of(LANCE_COL_NAME3, Types.StringType.get(),
"col_3_comment");
+ return new Column[] {col1, col2, col3};
+ }
+
+ protected Map<String, String> createProperties() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key1", "val1");
+ properties.put("key2", "val2");
+
+ return properties;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
index 3501abe10c..c39b8cbabb 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -29,9 +29,9 @@ public class TableVersionBaseSQLProvider {
public String insertTableVersion(@Param("tablePO") TablePO tablePO) {
return "INSERT INTO "
+ TABLE_NAME
- + " (table_id, format, properties, partitioning"
+ + " (table_id, format, properties, partitioning,"
+ " distribution, sort_orders, indexes, comment,"
- + " version, last_version, deleted_at)"
+ + " version, deleted_at)"
+ " VALUES ("
+ " #{tablePO.tableId},"
+ " #{tablePO.format},"
@@ -42,7 +42,6 @@ public class TableVersionBaseSQLProvider {
+ " #{tablePO.indexes},"
+ " #{tablePO.comment},"
+ " #{tablePO.currentVersion},"
- + " #{tablePO.lastVersion},"
+ " #{tablePO.deletedAt}"
+ " )";
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
index e0a7413b1c..13eebeaa2c 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -19,6 +19,41 @@
package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+import static
org.apache.gravitino.storage.relational.mapper.TableVersionMapper.TABLE_NAME;
+
import
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionPostgreSQLProvider extends
TableVersionBaseSQLProvider {
-public class TableVersionPostgreSQLProvider extends
TableVersionBaseSQLProvider {}
+ public String insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO")
TablePO tablePO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (table_id, format, properties, partitioning,"
+ + " distribution, sort_orders, indexes, comment,"
+ + " version, deleted_at)"
+ + " VALUES ("
+ + " #{tablePO.tableId},"
+ + " #{tablePO.format},"
+ + " #{tablePO.properties},"
+ + " #{tablePO.partitions},"
+ + " #{tablePO.distribution},"
+ + " #{tablePO.sortOrders},"
+ + " #{tablePO.indexes},"
+ + " #{tablePO.comment},"
+ + " #{tablePO.currentVersion},"
+ + " #{tablePO.deletedAt}"
+ + " )"
+ + " ON CONFLICT (table_id, deleted_at) DO UPDATE SET"
+ + " format = #{tablePO.format},"
+ + " properties = #{tablePO.properties},"
+ + " partitioning = #{tablePO.partitions},"
+ + " distribution = #{tablePO.distribution},"
+ + " sort_orders = #{tablePO.sortOrders},"
+ + " indexes = #{tablePO.indexes},"
+ + " comment = #{tablePO.comment},"
+ + " version = #{tablePO.currentVersion},"
+ + " deleted_at = #{tablePO.deletedAt}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index f4bbf7a6f6..7a42d95db6 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -210,6 +210,14 @@ public class TableMetaService {
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.updateTableMeta(newTablePO, oldTablePO,
newSchemaId))),
+ () ->
+ SessionUtils.doWithCommit(
+ TableVersionMapper.class,
+ mapper -> {
+ if (newTablePO.getFormat() != null) {
+
mapper.insertTableVersionOnDuplicateKeyUpdate(newTablePO);
+ }
+ }),
() -> {
if (updateResult.get() > 0 && (isColumnChanged ||
isSchemaChanged)) {
TableColumnMetaService.getInstance()
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 62bc11f891..fa8f06a9f8 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -457,17 +457,35 @@ public class POConverters {
}
try {
- return TablePO.builder()
- .withTableId(oldTablePO.getTableId())
- .withTableName(newTable.name())
- .withMetalakeId(oldTablePO.getMetalakeId())
- .withCatalogId(oldTablePO.getCatalogId())
- .withSchemaId(newSchemaId)
-
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo()))
- .withCurrentVersion(currentVersion)
- .withLastVersion(lastVersion)
- .withDeletedAt(DEFAULT_DELETED_AT)
- .build();
+ TablePO.Builder builder =
+ TablePO.builder()
+ .withTableId(oldTablePO.getTableId())
+ .withTableName(newTable.name())
+ .withMetalakeId(oldTablePO.getMetalakeId())
+ .withCatalogId(oldTablePO.getCatalogId())
+ .withSchemaId(newSchemaId)
+
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo()))
+ .withCurrentVersion(currentVersion)
+ .withLastVersion(lastVersion)
+ .withDeletedAt(DEFAULT_DELETED_AT);
+
+ // Note: GenericTableEntity will be removed in the refactor PR, so here
just keep the old
+ // logic to make the UT pass.
+ if (newTable instanceof GenericTableEntity genericTable) {
+ builder.withFormat(genericTable.getFormat());
+ builder.withComment(genericTable.getComment());
+ builder.withProperties(
+ genericTable.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+ builder.withIndexes(
+ genericTable.getIndexes() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
+ // TODO other fields in the refactor PRs.
+ }
+
+ return builder.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize json object:", e);
}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 143e979f69..5ff904eb31 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -81,6 +81,7 @@ import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
@@ -2792,4 +2793,138 @@ public class TestEntityStorage {
destroy(type);
}
}
+
+ @ParameterizedTest
+ @MethodSource("storageProvider")
+ void testLanceTableCreateAndUpdate(String type) {
+ Config config = Mockito.mock(Config.class);
+ init(type, config);
+
+ AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+ try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
+ store.initialize(config);
+
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake",
auditInfo);
+ store.put(metalake, false);
+
+ CatalogEntity catalogEntity =
+ CatalogEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("catalog")
+ .withNamespace(NamespaceUtil.ofCatalog("metalake"))
+ .withType(Catalog.Type.RELATIONAL)
+ .withProvider("generic-lakehouse")
+ .withComment("This is a generic-lakehouse")
+ .withProperties(ImmutableMap.of())
+ .withAuditInfo(auditInfo)
+ .build();
+
+ store.put(catalogEntity, false);
+
+ SchemaEntity schemaEntity =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("schema")
+ .withNamespace(NamespaceUtil.ofSchema("metalake", "catalog"))
+ .withComment("This is a schema for generic-lakehouse")
+ .withProperties(ImmutableMap.of())
+ .withAuditInfo(auditInfo)
+ .build();
+ store.put(schemaEntity, false);
+
+ long column1Id = RandomIdGenerator.INSTANCE.nextId();
+ GenericTableEntity table =
+ GenericTableEntity.getBuilder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withNamespace(NamespaceUtil.ofTable("metalake", "catalog",
"schema"))
+ .withName("table")
+ .withAuditInfo(auditInfo)
+ .withColumns(
+ Lists.newArrayList(
+ ColumnEntity.builder()
+ .withId(column1Id)
+ .withName("column1")
+ .withDataType(Types.StringType.get())
+ .withComment("test column")
+ .withPosition(1)
+ .withAuditInfo(auditInfo)
+ .build()))
+ .withComment("This is a lance table")
+ .withFormat("lance")
+ .withProperties(ImmutableMap.of("location", "/tmp/test",
"format", "lance"))
+ .build();
+ store.put(table, false);
+ GenericTableEntity fetchedTable =
+ store.get(table.nameIdentifier(), Entity.EntityType.TABLE,
GenericTableEntity.class);
+
+ // check table properties
+ Assertions.assertEquals("/tmp/test",
fetchedTable.getProperties().get("location"));
+ Assertions.assertEquals("lance",
fetchedTable.getProperties().get("format"));
+ Assertions.assertEquals("This is a lance table",
fetchedTable.getComment());
+ Assertions.assertEquals(1, fetchedTable.columns().size());
+ Assertions.assertEquals("column1", fetchedTable.columns().get(0).name());
+
+ // Now try to update the table
+ GenericTableEntity updatedTable =
+ GenericTableEntity.getBuilder()
+ .withId(table.id())
+ .withNamespace(table.namespace())
+ .withName(table.name())
+ .withAuditInfo(auditInfo)
+ .withFormat("lance")
+ .withColumns(
+ Lists.newArrayList(
+ ColumnEntity.builder()
+ .withId(column1Id)
+ .withName("column1")
+ .withDataType(Types.StringType.get())
+ .withComment("updated test column")
+ .withPosition(1)
+ .withAuditInfo(auditInfo)
+ .build(),
+ ColumnEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("column2")
+ .withDataType(Types.IntegerType.get())
+ .withComment("new column")
+ .withPosition(2)
+ .withAuditInfo(auditInfo)
+ .build()))
+ .withComment("This is an updated lance table")
+ .withProperties(ImmutableMap.of("location", "/tmp/updated_test",
"format", "lance"))
+ .build();
+
+ store.update(
+ table.nameIdentifier(),
+ GenericTableEntity.class,
+ Entity.EntityType.TABLE,
+ e -> updatedTable);
+ GenericTableEntity fetchedUpdatedTable =
+ store.get(table.nameIdentifier(), Entity.EntityType.TABLE,
GenericTableEntity.class);
+
+ // check updated table properties
+ Assertions.assertEquals(
+ "/tmp/updated_test",
fetchedUpdatedTable.getProperties().get("location"));
+ Assertions.assertEquals("lance",
fetchedUpdatedTable.getProperties().get("format"));
+ Assertions.assertEquals("This is an updated lance table",
fetchedUpdatedTable.getComment());
+ Assertions.assertEquals(2, fetchedUpdatedTable.columns().size());
+ for (ColumnEntity column : fetchedUpdatedTable.columns()) {
+ if (column.name().equals("column1")) {
+ Assertions.assertEquals("updated test column", column.comment());
+ }
+ }
+
+ Assertions.assertTrue(
+ fetchedUpdatedTable.columns().stream()
+ .filter(c -> c.name().equals("column2"))
+ .findFirst()
+ .isPresent());
+ destroy(type);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}