This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new e90fe35cb6 [#8939][#9093] Improvement(lance-catalog): Fix register
table endpoint and make Lance operations atomic (#9216)
e90fe35cb6 is described below
commit e90fe35cb600813a504efdf39d6e9909ae422ad7
Author: Junda Yang <[email protected]>
AuthorDate: Fri Nov 21 21:19:48 2025 -0800
[#8939][#9093] Improvement(lance-catalog): Fix register table endpoint and
make Lance operations atomic (#9216)
### What changes were proposed in this pull request?
This PR includes two improvements to the Lance catalog:
- Fixed Lance register table endpoint: Added the "register": "true"
property in LanceTableOperations.registerTable() API, which is passed to
GenericLakehouseCatalogOperations to ensure the table is registered
without creating a physical storage location.
- Made Lance operations atomic: Implemented proper rollback mechanism in
GenericLakehouseCatalogOperations to remove metadata from Gravitino
store if Lance dataset creation fails, preventing orphaned metadata
entries.
### Why are the changes needed?
1. Register table fix: Without the "register": "true" property, the
Lance catalog was incorrectly creating physical table locations during
registration instead of just registering metadata for existing tables.
2. Atomic operations: Previously, if Lance dataset creation failed, the
metadata would remain in the Gravitino store, preventing subsequent
retry attempts. This left the system in an inconsistent state where
metadata existed but the actual dataset didn't.
Fix: #8939
Fix: #9093
### Does this PR introduce _any_ user-facing change?
No. These are internal implementation fixes that improve reliability
without changing user-facing APIs.
### How was this patch tested?
- Added new integration tests in CatalogGenericLakehouseLanceIT to
verify atomic operations and proper cleanup on failure
- Added unit tests in TestLanceNamespaceOperations to verify register
table endpoint behavior
- Added integration tests in LanceRESTServiceIT to test end-to-end
registration flows
---------
Co-authored-by: Mini Yu <[email protected]>
---
.../GenericLakehouseCatalogOperations.java | 78 +++++++++++++---------
.../test/CatalogGenericLakehouseLanceIT.java | 30 +++++++++
.../lance/service/rest/LanceTableOperations.java | 1 +
.../lance/integration/test/LanceRESTServiceIT.java | 20 ++++++
.../service/rest/TestLanceNamespaceOperations.java | 41 ++++++++++++
5 files changed, 139 insertions(+), 31 deletions(-)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 960ff5c18e..b84280df69 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -266,9 +266,8 @@ public class GenericLakehouseCatalogOperations
i -> ColumnEntity.toColumnEntity(columns[i], i,
idGenerator.nextId(), auditInfo))
.collect(Collectors.toList());
- TableEntity entityToStore;
try {
- entityToStore =
+ TableEntity entityToStore =
TableEntity.builder()
.withName(ident.name())
.withNamespace(ident.namespace())
@@ -284,41 +283,58 @@ public class GenericLakehouseCatalogOperations
.withAuditInfo(auditInfo)
.build();
store.put(entityToStore);
+ } catch (EntityAlreadyExistsException e) {
+ throw new TableAlreadyExistsException(e, "Table %s already exists in the
metadata", ident);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create table metadata for " +
ident, e);
+ }
- // Get the value of register in table properties
- boolean register =
- Boolean.parseBoolean(
- properties.getOrDefault(
- GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_REGISTER,
"false"));
- if (register) {
- // Do not need to create the physical table if this is a registration
operation.
- // Whether we need to check the existence of the physical table?
- GenericLakehouseTable.Builder builder =
GenericLakehouseTable.builder();
- return builder
- .withName(ident.name())
- .withColumns(columns)
- .withComment(comment)
- .withProperties(properties)
- .withDistribution(distribution)
- .withIndexes(indexes)
- .withAuditInfo(
- AuditInfo.builder()
- .withCreator(PrincipalUtils.getCurrentUserName())
- .withCreateTime(Instant.now())
- .build())
- .withPartitioning(partitions)
- .withSortOrders(sortOrders)
- .withFormat(LakehouseTableFormat.LANCE.lowerName())
- .build();
- }
+ // Get the value of register in table properties
+ boolean register =
+ Boolean.parseBoolean(
+ properties.getOrDefault(
+ GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_REGISTER,
"false"));
+ if (register) {
+ // Do not need to create the physical table if this is a registration
operation.
+ // Whether we need to check the existence of the physical table?
+ GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
+ return builder
+ .withName(ident.name())
+ .withColumns(columns)
+ .withComment(comment)
+ .withProperties(properties)
+ .withDistribution(distribution)
+ .withIndexes(indexes)
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build())
+ .withPartitioning(partitions)
+ .withSortOrders(sortOrders)
+ .withFormat(LakehouseTableFormat.LANCE.lowerName())
+ .build();
+ }
+ try {
LakehouseCatalogOperations lanceCatalogOperations =
getLakehouseCatalogOperations(newProperties);
return lanceCatalogOperations.createTable(
ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
- } catch (EntityAlreadyExistsException e) {
- throw new TableAlreadyExistsException(e, "Table %s already exists",
ident);
- } catch (IOException e) {
+ } catch (Exception e) {
+ // Try to roll back the metadata entry in Gravitino store
+ try {
+ store.delete(ident, Entity.EntityType.TABLE);
+ } catch (IOException ioException) {
+ LOG.error(
+ "Failed to roll back the metadata entry for table {} after
physical table creation failure.",
+ ident,
+ ioException);
+ }
+ if (e.getClass().isAssignableFrom(RuntimeException.class)) {
+ throw e;
+ }
+
throw new RuntimeException("Failed to create table " + ident, e);
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
index ca1e8ca003..0c92b43eef 100644
---
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
+++
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
@@ -53,6 +53,7 @@ import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.Column;
@@ -255,6 +256,35 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT
{
Arrays.asList(catalog.asTableCatalog().listTables(nameIdentifier.namespace()));
Assertions.assertEquals(1, tableIdentifiers.size());
Assertions.assertEquals(nameIdentifier, tableIdentifiers.get(0));
+
+ // Now try to simulate the location of lance table does not exist.
+ Map<String, String> newProperties = createProperties();
+ newProperties.put("format", "lance");
+ // Use a wrong location to let the table creation fail
+ newProperties.put("location", "hdfs://localhost:9000/wrong_location");
+
+ String nameNew = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+ NameIdentifier newNameIdentifier = NameIdentifier.of(schemaName, nameNew);
+ Exception e =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> {
+ catalog
+ .asTableCatalog()
+ .createTable(
+ newNameIdentifier,
+ columns,
+ TABLE_COMMENT,
+ newProperties,
+ Transforms.EMPTY_TRANSFORM,
+ null,
+ null);
+ });
+
+ Assertions.assertTrue(e.getMessage().contains("Invalid user input"));
+
+ Assertions.assertThrows(
+ NoSuchTableException.class, () ->
catalog.asTableCatalog().loadTable(newNameIdentifier));
}
@Test
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 290730f39a..bbef8e3b6a 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -162,6 +162,7 @@ public class LanceTableOperations {
? Maps.newHashMap()
: Maps.newHashMap(registerTableRequest.getProperties());
props.put(LANCE_LOCATION, registerTableRequest.getLocation());
+ props.put("register", "true");
ModeEnum mode = registerTableRequest.getMode();
RegisterTableResponse response =
diff --git
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
index c7454442b6..a63bb39307 100644
---
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
+++
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
@@ -440,6 +440,26 @@ public class LanceRESTServiceIT extends BaseIT {
});
Assertions.assertTrue(exception.getMessage().contains("Table already
exists"));
Assertions.assertEquals(409, exception.getCode());
+
+ // Try to create a table with wrong location should fail
+ CreateEmptyTableRequest wrongLocationRequest = new
CreateEmptyTableRequest();
+ wrongLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME,
"wrong_location_table"));
+ wrongLocationRequest.setLocation("hdfs://localhost:9000/invalid_path/");
+ LanceNamespaceException apiException =
+ Assertions.assertThrows(
+ LanceNamespaceException.class,
+ () -> {
+ ns.createEmptyTable(wrongLocationRequest);
+ });
+ Assertions.assertTrue(apiException.getMessage().contains("Invalid user
input"));
+
+ // Correct the location and try again
+ String correctedLocation = tempDir + "/" + "wrong_location_table/";
+ wrongLocationRequest.setLocation(correctedLocation);
+ CreateEmptyTableResponse wrongLocationResponse =
+ Assertions.assertDoesNotThrow(() ->
ns.createEmptyTable(wrongLocationRequest));
+ Assertions.assertNotNull(wrongLocationResponse);
+ Assertions.assertEquals(correctedLocation,
wrongLocationResponse.getLocation());
}
@Test
diff --git
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
index efe1f90436..0fc4df9cfd 100644
---
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
+++
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
@@ -502,6 +502,47 @@ public class TestLanceNamespaceOperations extends
JerseyTest {
Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
}
+ @Test
+ void testRegisterTableSetsRegisterPropertyToTrue() {
+ String tableIds = "catalog.scheme.register_table_with_property";
+ String delimiter = ".";
+
+ // Reset mock to clear any previous test state
+ Mockito.reset(tableOps);
+
+ // Test that the "register" property is set to "true"
+ RegisterTableResponse registerTableResponse = new RegisterTableResponse();
+ registerTableResponse.setLocation("/path/to/registered_table");
+ registerTableResponse.setProperties(ImmutableMap.of("key", "value",
"register", "true"));
+ when(tableOps.registerTable(any(), any(), any(),
any())).thenReturn(registerTableResponse);
+
+ RegisterTableRequest tableRequest = new RegisterTableRequest();
+ tableRequest.setLocation("/path/to/registered_table");
+ tableRequest.setMode(RegisterTableRequest.ModeEnum.CREATE);
+ tableRequest.setProperties(ImmutableMap.of("custom-key", "custom-value"));
+
+ Response resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+
+ // Verify that registerTable was called with properties containing
"register": "true"
+ Mockito.verify(tableOps)
+ .registerTable(
+ eq(tableIds),
+ eq(RegisterTableRequest.ModeEnum.CREATE),
+ eq(delimiter),
+ Mockito.argThat(
+ props ->
+ props != null
+ && "true".equals(props.get("register"))
+ &&
"/path/to/registered_table".equals(props.get("location"))
+ && "custom-value".equals(props.get("custom-key"))));
+ }
+
@Test
void testDeregisterTable() {
String tableIds = "catalog.scheme.deregister_table";